商品中心—商品B端搜索系统的实现文档(二)

发布于:2025-06-26 ⋅ 阅读:(21) ⋅ 点赞:(0)

8.步骤四:基于索引实现搜索功能

(1)基于suggest索引的自动补全实现

实现自动补全的代码比较简单,其原理是:把搜索词汇和倒排索引里的所有前缀匹配的词条进行score比较,然后把分数最高的那些返回,其中会涉及到suggest索引的word1(IK分词器 + pinyin分词器)。

具体步骤如下:

步骤一:构建CompletionSuggestion条件
步骤二:封装搜索请求
步骤三:通过restHighLevelClient查询ElasticSearch
步骤四:获取响应中的补全的词的列表

@RestController
@RequestMapping("/api/common")
public class CommonSearchController {
    ...
    //通用服务组件
    @Autowired
    private CommonSearchService commonSearchService;

    //输入内容自动补全接口
    @GetMapping("/autoComplete")
    public JsonResult autoComplete(@RequestBody AutoCompleteRequest request) throws IOException {
        List<String> completedWords = commonSearchService.autoComplete(request);
        return JsonResult.buildSuccess(completedWords);
    }
    ...
}

@Data
public class AutoCompleteRequest {
    //索引名称
    private String indexName;
    //字段名称
    private String fieldName;
    //需要补全的词(用户输入的内容)
    private String text;
    //返回多少个补全后的词
    private int count;
}

//通用查询服务实现类
@Service
public class CommonSearchServiceImpl implements CommonSearchService {
    private static final String MY_SUGGEST = "my_suggest";

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    public List<String> autoComplete(AutoCompleteRequest request) throws IOException {
        //1.构建CompletionSuggestion条件
        CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(request.getFieldName());
        completionSuggestionBuilder.prefix(request.getText());
        completionSuggestionBuilder.skipDuplicates(true);
        completionSuggestionBuilder.size(request.getCount());

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
        searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, completionSuggestionBuilder));

        //2.封装搜索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //3.查询ElasticSearch
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        //4.获取响应中的补全的词的列表
        CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
        List<CompletionSuggestion.Entry.Option> options = completionSuggestion.getEntries().get(0).getOptions();

        List<String> result = new ArrayList<>();
        for (CompletionSuggestion.Entry.Option option : options) {
            result.add(option.getText().string());
        }

        return result;
    }
    ...
}

(2)输入框中的拼写纠错实现

实现拼写纠错的代码也比较简单,其原理是:把输入的有拼写错误的搜索词汇,先自动进行纠错。然后再和倒排索引里的所有匹配的词条进行score比较,最后把分数最高的那一条返回,其中会涉及到suggest索引的word2。

具体步骤如下:

步骤一:构建PhraseSuggestion条件
步骤二:封装搜索请求
步骤三:通过restHighLevelClient查询ElasticSearch
步骤四:获取响应中纠错后的词

@RestController
@RequestMapping("/api/common")
public class CommonSearchController {
    //通用服务组件
    @Autowired
    private CommonSearchService commonSearchService;
    ...

    //输入内容拼写纠错接口
    @GetMapping("/spellingCorrection")
    public JsonResult spellingCorrection(@RequestBody SpellingCorrectionRequest request) throws IOException {
        String correctedWord = commonSearchService.spellingCorrection(request);
        return JsonResult.buildSuccess(correctedWord);
    }
}

@Data
public class SpellingCorrectionRequest {
    //索引名称
    private String indexName;
    //字段名称
    private String fieldName;
    //用户输入的内容
    private String text;
}

//通用查询服务实现类
@Service
public class CommonSearchServiceImpl implements CommonSearchService {
    private static final String MY_SUGGEST = "my_suggest";

    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    @Override
    public String spellingCorrection(SpellingCorrectionRequest request) throws IOException {
        //1.构建PhraseSuggestion条件
        PhraseSuggestionBuilder phraseSuggestionBuilder = new PhraseSuggestionBuilder(request.getFieldName());
        phraseSuggestionBuilder.text(request.getText());
        phraseSuggestionBuilder.size(1);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
        searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, phraseSuggestionBuilder));

        //2.封装搜索请求
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //3.查询ElasticSearch
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        //4.获取响应中纠错后的词
        PhraseSuggestion phraseSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
        List<PhraseSuggestion.Entry.Option> options = phraseSuggestion.getEntries().get(0).getOptions();

        return Optional.ofNullable(options).filter(e -> !e.isEmpty()).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");
    }
}

(3)商品B端的商品搜索代码实现

搜索流程应为:输入搜索词 -> 拼写纠错 -> 自动补全 -> 全文检索。

具体步骤如下:

步骤一:构建match条件
步骤二:设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要)
步骤三:设置搜索分页参数
步骤四:封装搜索请求
步骤五:调用restHighLevelClient查询ElasticSearch
步骤六:对结果进行高亮处理

@RestController
@RequestMapping("/api/product")
public class ProductSearchController {
    //商品服务组件
    @Autowired
    private ProductService productService;

    //商品全文检索接口
    @GetMapping("/fullTextSearch")
    public JsonResult fullTextSearch(@RequestBody FullTextSearchRequest request) throws IOException {
        SearchResponse searchResponse = productService.fullTextSearch(request);
        Map<String, Object> resultMap = new HashMap<>();
        SearchHit[] hits = searchResponse.getHits().getHits();
        long totalCount = searchResponse.getHits().getTotalHits().value;
        resultMap.put("hits", hits);
        resultMap.put("totalCount", totalCount);
        resultMap.put("pageNum", request.getPageNum());
        resultMap.put("pageSize", request.getPageSize());
        return JsonResult.buildSuccess(resultMap);
    }
    ...
}

@Data
public class FullTextSearchRequest {
    //索引名字
    private String indexName;
    //查询参数:key为字段的名字,value为字段的关键词,可以指定从哪些字段里检索
    private Map<String, String> queryTexts;
    //高亮字段
    private String highLightField;
    //当前页
    private int pageNum;
    //每页条数
    private int pageSize;
}

//商品查询服务实现类
@Service
public class ProductServiceImpl implements ProductService {
    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Override
    public SearchResponse fullTextSearch(FullTextSearchRequest request) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.trackTotalHits(true);

        //1.构建match条件
        request.getQueryTexts().forEach((field, text) -> {
            searchSourceBuilder.query(QueryBuilders.matchQuery(field, text));
        });

        //2.设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要)
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.field(request.getHighLightField());
        highlightBuilder.preTags("<span stype=color:red>"); //搜索结果里,商品标题和搜索词匹配的部分会显示为红色
        highlightBuilder.postTags("</span>");
        highlightBuilder.numOfFragments(0);
        searchSourceBuilder.highlighter(highlightBuilder);

        //3.设置搜索分页参数
        int from = (request.getPageNum() - 1) * request.getPageSize();
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(request.getPageSize());

        //4.封装搜索请求
        SearchRequest searchRequest = new SearchRequest(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //5.查询ElasticSearch
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        //6.对结果进行高亮处理
        SearchHits hits = searchResponse.getHits();
        for (SearchHit hit : hits) {
            HighlightField highlightField = hit.getHighlightFields().get(request.getHighLightField());
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            Text[] fragments = highlightField.fragments();
            StringBuilder builder = new StringBuilder();
            for (Text fragment : fragments) {
                builder.append(fragment.string());
            }
            sourceAsMap.put(request.getHighLightField(), builder.toString());
        }
        return searchResponse;
    }
    ...
}

(4)搜索结果为空时的自动推荐代码实现

如果全文检索的结果为空,那么可以继续调用自动推荐进行相似搜索。

搜索流程应为:输入搜索词 -> 拼写纠错(completion) -> 自动补全(phrase) -> 全文检索(match) -> 自动推荐(term)。

具体步骤如下:

步骤1:构建TermSuggestion条件
步骤2:封装搜索请求
步骤3:调用restHighLevelClient查询ElasticSearch
步骤4:获取响应中推荐给用户的词

@GetMapping("/recomendWhenMissing")
public JsonResult recommendWhenMissing(@RequestBody RecommendWhenMissingRequest request) throws IOException {
    String recommendWord = commonSearchService.recommendWhenMissing(request);
    return JsonResult.buildSuccess(recommendWord);
}

@Override
public String recommendWhenMissing(RecommendWhenMissingRequest request) throws IOException {
    //1.构建TermSuggestion条件
    TermSuggestionBuilder termSuggestionBuilder = new TermSuggestionBuilder(request.getFieldName());
    termSuggestionBuilder.text(request.getText());
    termSuggestionBuilder.analyzer(IK_SMART);
    termSuggestionBuilder.minWordLength(2);
    termSuggestionBuilder.stringDistance(TermSuggestionBuilder.StringDistanceImpl.NGRAM);

    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
    searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, termSuggestionBuilder));

    //2.封装搜索请求
    SearchRequest searchRequest = new SearchRequest();
    searchRequest.indices(request.getIndexName());
    searchRequest.source(searchSourceBuilder);

    //3.查询ElasticSearch
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

    //4.获取响应中推荐给用户的词
    TermSuggestion termSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
    List<TermSuggestion.Entry.Option> options = termSuggestion.getEntries().get(0).getOptions();

    return Optional.ofNullable(options).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");
}

(5)基于多条件对商品进行结构化搜索

具体步骤如下:

步骤1:解析queryDSL
步骤2:设置搜索分页参数
步骤3:封装搜索请求
步骤4:调用restHighLevelClient查询ElasticSearch

@RestController
@RequestMapping("/api/product")
public class ProductSearchController {
    //商品服务组件
    @Autowired
    private ProductService productService;
    ...

    //商品结构化搜索接口
    @GetMapping("/structuredSearch")
    public JsonResult structuredSearch(@RequestBody StructuredSearchRequest request) throws IOException {
        SearchResponse searchResponse = productService.structuredSearch(request);
        Map<String, Object> resultMap = new HashMap<>();
        SearchHit[] hits = searchResponse.getHits().getHits();
        long totalCount = searchResponse.getHits().getTotalHits().value;
        resultMap.put("hits", hits);
        resultMap.put("totalCount", totalCount);
        resultMap.put("pageNum", request.getPageNum());
        resultMap.put("pageSize", request.getPageSize());
        return JsonResult.buildSuccess(resultMap);
    }
}

@Data
public class StructuredSearchRequest {
    //索引名字
    private String indexName;
    //Query DSL:ES查询语法,是按照JSON来组织
    //按照ElasticSearch的规范写的Query DSL,是一个JSON对象
    //解析的时候转成JSON字符串,客户端API可以直接解析字符串
    private Map<String, Object> queryDsl;
    //当前页
    private int pageNum;
    //每页条数
    private int pageSize;
}

@Service
public class ProductServiceImpl implements ProductService {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    ...

    @Override
    public SearchResponse structuredSearch(StructuredSearchRequest request) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.trackTotalHits(true);

        //1.解析queryDSL
        String queryDsl = JSON.toJSONString(request.getQueryDsl());
        SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
        NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
        XContent xContent = XContentFactory.xContent(XContentType.JSON);
        XContentParser xContentParser = xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, queryDsl);
        searchSourceBuilder.parseXContent(xContentParser);

        //2.设置搜索分页参数
        int from = (request.getPageNum() - 1) * request.getPageSize();
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(request.getPageSize());

        //3.封装搜索请求
        SearchRequest searchRequest = new SearchRequest(request.getIndexName());
        searchRequest.source(searchSourceBuilder);

        //4.查询ElasticSearch
        return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    }
}

9.步骤五:大数据量写入ES和搜索性能的调优

(1)单线程将百万商品数据写入ES

一.创建索引

PUT /demo_plan_sku_index_01
{ 
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "skuId": {
                "type": "keyword"
            },
            "skuName": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "category": {
                "type": "keyword"
            },
            "basePrice": {
                "type": "integer"
            },
            "vipPrice": {
                "type": "integer"
            },
            "saleCount": {
                "type": "integer"
            },
            "commentCount": {
                "type": "integer"
            },
            "skuImgUrl": {
                "type": "keyword",
                "index": false
            },
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "updateTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            }
        }
    } 
}

二.请求接口

/api/mockData/mockData1

三.请求参数

写入demo_plan_sku_index_01索引,每次批量插入1000条商品数据,一共执行1000次批量插入。

{
    "indexName":"demo_plan_sku_index_01",
    "batchSize":1000,
    "batchTimes":1000
}

四.请求响应

该次测试耗时62s,写入了100万条数据。每个线程每秒可以写入1.6万条数据,所以单线程每秒差不多执行了16个BulkRequest批量写入。60ms可以执行一次BulkRequest批量写入,每个BulkRequest会包含1000条数据。100万条数据大概会占用几百MB,所以很多数据都可以驻留在ES机器的OS Cache里,有利搜索。

{ 
    "success": true,
    "data": {
        "totalCount": 1000000,
        "elapsedSeconds": 62,
        "perSecond": 16130
    },
    "errorCode": null,
    "errorMessage": null
}

(2)多线程将百万商品数据写入ES

一.创建索引

//demo_plan_sku_index_02和demo_plan_sku_index_03一样的
PUT /demo_plan_sku_index_02
{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "skuId": {
                "type": "keyword"
            },
            "skuName": { 
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "category": {
                "type":"keyword"
            },
            "basePrice": {
                "type": "integer"
            },
            "vipPrice": {
                "type": "integer"
            },
            "saleCount": {
                "type": "integer"
            },
            "commentCount": {
                "type": "integer"
            },
            "skuImgUrl": {
                "type": "keyword",
                "index": false
            },
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "updateTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            }
        }
    } 
}

二.请求接口

/api/mockData/mockData2

三.请求参数

操作demo_plan_sku_index_02索引,每次批量插⼊1000条商品数据,⼀共执⾏1000次批量插⼊,使⽤30个线程同时执⾏。

{
    "indexName": "demo_plan_sku_index_02",
    "batchSize": 1000,
    "batchTimes": 1000,
    "threadCount": 30
}

操作demo_plan_sku_index_03索引,每次批量插⼊1000条商品数据,⼀共执⾏1000次批量插⼊,使⽤60个线程同时执⾏。

{
    "indexName": "demo_plan_sku_index_03",
    "batchSize": 1000,
    "batchTimes": 1000,
    "threadCount": 60
}

四.请求响应

该次测试耗时11秒,每秒写入9万条数据,总共使用11秒完成100万条数据的写入。由于有30个线程在并发地一起跑,所以每个线程每秒可以写入3000条数据。即每个线程每秒能将3个BulkRequest批量写入到ES,每个BulkRequest的写入需要300ms左右。

对比单线程写入百万数据到ES时,每个线程每秒可以写入1.6万条数据。而这里多线程写入百万数据到ES时,每个线程每秒才写入3000天数据。

可见,并不是线程数量越多越好。线程数量越多会导致对CPU负载和消耗越大,要耗费更多时间进行线程上下文切换。CPU负载高了之后,线程处理同样的任务,吞吐量和速率会下降。CPU只要不超过80%,其实都可以接受。

//下面是30个线程时的响应
{
    "success": true,
    "data": {
        "totalCount": 1000000,
        "elapsedSeconds": 11,
        "perSecond": 90909
    },
    "errorCode": null,
    "errorMessage": null
}
//下面是60个线程时的响应
{
    "success": true,
    "data": {
        "totalCount": 1000000,
        "elapsedSeconds": 10,
        "perSecond": 100000
    },
    "errorCode": null,
    "errorMessage": null
}

总结:多线程 + Bulk批量写入,10秒就可以完成百万级数据的写入。会有一个最佳的线程数,超过这个临界点,线程数越多反而效果会下降。

(3)数据写入到ES的存储层原理简析

首先ES会将收到的写入请求,将数据写到一个叫index buffer的JVM缓冲区中。然后会有一个线程,每隔1秒定时将这个JVM缓冲区的数据refresh刷新到OS Page Cache。当数据刷到OS Page Cache时,就可以被ES搜索到了。过一段时间后,OS Page Cache的数据会被flush到ES的磁盘文件里。

为了保证数据不丢失,会把数据也写入到内存translog里面,默认内存translog会每隔5秒进行刷盘到translog磁盘文件。

写入到单节点的数据还会进行副本复制到其他节点。

(4)将数据写入到ES的性能影响因素

因素一:refresh间隔,默认会每隔1秒刷新JVM缓冲的数据到OS Page Cache。这会影响数据写入的速度,在写入全量数据的场景,可以将间隔调大一点。比如120秒,通过减少频繁的refresh来提升性能。

因素二:副本复制会影响写入的速度。在写入全量数据的场景,同样没必要进行副本的复制。可以先将数据都写入到一个节点,之后再慢慢进行副本的复制。

因素三:index buffer的大小。在写入全量数据的场景,可以调大index buffer的大小。

因素四:translog的刷盘策略。在写入全量数据的场景,可以调整translog为异步刷盘,并且刷盘间隔调大一些。存放translog的内存大小也调大一些,让其存放更多的数据才去进行刷盘。

(5)全量数据写入ES的性能调优方案

下面这些参数的调整是针对写入全量数据的场景,全量写入完毕后应恢复原来的值。

一.调整refresh_interval参数(可以动态配置)。在全量写⼊数据的场景下,对"写⼊后1s就要能搜索到"的要求没有那么⾼。所以可以把这个值设置为120s,来减少频繁的refresh和lucene段合并⾏为。

二.调整number_of_replicas参数(可以动态配置)。ElasticSearch的副本数是可以动态调整的,写⼊时可以先把副本数设置为0,缩短数据写⼊的流程。批量导⼊完成之后,重新设置回副本数。

三.调整index_buffer_size参数。把JVM缓冲区的大小调大,可以让数据先写入到内存。避免JVM缓存区内存太小,很快写满而需要频繁刷盘。

四.调整translog参数(可以动态配置)。把translog的相关参数调大,避免尽量触发translog刷盘策略。

综上可知:首先在elasticsearch.yml中修改ES的配置,然后重启ES集群的三个节点。

$ vi /app/elasticsearch/elasticsearch-7.9.3/config/elasticsearch.yml
# 写⼊优化参数
indices.memory.index_buffer_size: 30%
indices.memory.min_index_buffer_size: 128m

然后在创建索引时对索引进行如下配置:

{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 0,
        "index.refresh_interval": "120s",
        "index.translog.durability": "async",
        "index.translog.sync_interval": "120s",
        "index.translog.flush_threshold_size": "2048mb"
    }
}

(6)百万商品数据写入ES的调优性能

可见,调优后的写入性能提升了一倍多。完成全量数据写入ES后,就可以动态调整索引的settings来恢复默认的配置。

(7)亿级商品数据的搜索性能测试

一.全文搜索测试

请求接口:

/api/product/fullTextSearch

请求参数:

{
    "pageNum": 1,
    "pageSize": 100,
    "indexName": "demo_plan_sku_index",
    "highLightField": "skuName",
    "queryTexts": {
        "skuName": "华为⼿机"
    }
}

比如搜索"华为手机",那么首先会对搜索词使用ik_smart进行分词,分成"华为"和"手机",之后再去倒排索引里对"华为"和"手机"这两分词进行搜索。

在上亿的商品数据里进行全文检索,耗时几百ms算是很快了,符合标准。查询多次的耗时详情如下,其中匹配的文档数有35万。

二.结构化搜索测试

请求接口:

/api/product/structuredSearch

请求参数:

{
    "pageNum": 1,
    "pageSize": 100,
    "indexName": "career_plan_sku_index",
    "queryDsl": {
        "query": {
            "bool": {
                "must": [{
                    "term": {
                        "category": {
                            "value": "⼿机"
                        }
                    }
                }],
                "filter": [{
                    "range": {
                        "basePrice": {
                            "gte": 1000,
                            "lte": 3000
                        }
                    }
                }]
            }
        },
        "sort": [{
            "basePrice": {
                "order":"desc"
            }
        }]
    } 
}

比如搜索手机分类下的商品按某价格区间倒序排列,刚开始需要花几秒。因为首先根据分类和价格区间去索引里查找数据,之后还需要按照价格排序。排序的过程可能会导致大量数据从磁盘读入内存,再写入临时磁盘文件进行排序,排序之后还需要分页提取。所以第一次整个过程比较慢。

后续再次搜索时,大量数据已经读入内存,不用再去进行磁盘IO了,所以会变快。查询多次的耗时详情如下,其中匹配的文档数有35万。

(8)ES搜索性能优化的方案分析

ES的性能是特别棒的,在合理的机器配置下,其实是不怎么需要做优化的。当我们的业务遇到查询瓶颈时再根据业务场景的特点从以下⼏点看看哪个能再去优化。而且ES比较适合全文检索,根据分词进行匹配打分排序,在上亿数据量之下也有非常好的搜索效果。但是ES面对结构化搜索则不稳定,使用多个条件来进行查询、按照指定条件进行排序,可能性能很差。因为其中可能会命中大量数据,然后产生大量的临时磁盘IO。

一.ES的查询缓存会保存在OS内存中。所以需要给操作系统的内存保留足够空间,不过一般都会把机器内存的一半给JVM,另一半给OS Cache。

二.磁盘IO性能和CPU性能。对于普通的搜索,磁盘IO的性能最影响搜索性能。对与计算⽐较多的搜索,CPU的性能会是⼀个瓶颈。

三.建立预索引Pre-Index。适⽤于数字类型的字段和经常做范围搜索的场景,比如可以把数字类型的字段转换成keyword类型的字段,把range查询转换为terms查询。

四.把long类型的skuID设置为keyword类型

五.强制合并一些只读的索引,避免从多个磁盘文件去搜索。

总结:最关键的其实是给OS Cache多预留一些内存,尽量让节点的数据都能加载到内存里。比如节点是32G内存的,16G给JVM,16G给OS Cache,然后节点的数据也控制好在16G内。否则如果OS Cache才16G,但节点的数据已经几百G了,那搜索时必然会进行大量的磁盘IO。也就是要想让ES提升搜索性能,主要靠将数据驻留在OS Cache里。所以要用大内存机器部署ES节点,尽量让每个节点上的主shard的数据量和OS Cache的内存量差不多。这样在搜索时,尽可能去OS Cache里查询数据,从而避免进行磁盘IO。

10.elasticsearch-analysis-ik⼯程的表结构

⼀共有两张表:extension_word扩展词库表,stop_word停⽤词库表。

CREATE TABLE `extension_word` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `word` varchar(64) NOT NULL,
    `create_time` datetime NOT NULL,
    `update_time` datetime NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `stop_word` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `word` varchar(64) NOT NULL,
    `create_time` datetime NOT NULL,
    `update_time` datetime NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

11.elasticsearch-analysis-ik⼯程的执行步骤

步骤一:读取数据库连接配置⽂件

步骤二:连接数据库

步骤三:查询扩展词库表和停⽤词库表

步骤四:添加到字典中

步骤五:使⽤⼀个线程周期性执⾏上⾯2-4步

12.elasticsearch-analysis-ik⼯程的代码

(1)添加的DictLoader类

代码位置:

org.wltea.analyzer.dic.DictLoader

//加载MySQL中的词库内容,单例
public class DictLoader {
    private static final Logger LOGGER = ESPluginLoggerFactory.getLogger(DictLoader.class.getName());
    private static final DictLoader INSTANCE = new DictLoader();
    private final String url;
    private final String username;
    private final String password;
    private final AtomicBoolean extensionWordFistLoad = new AtomicBoolean(false);
    private final AtomicReference<String> extensionWordLastLoadTimeRef = new AtomicReference<>(null);
    private final AtomicBoolean stopWordFistLoad = new AtomicBoolean(false);
    private final AtomicReference<String> stopWordLastLoadTimeRef = new AtomicReference<>(null);

    //单例类,构造函数是私有的
    private DictLoader() {
        //创建一个Properties配置数据对象,用来获取MySQL JDBC连接的配置
        Properties mysqlConfig = new Properties();
        //PathUtils会从指定目录下,对指定的文件名进行拼接,然后返回全路径名
        //所以这里会把"IK分词器配置目录 + jdbc.properties"拼接成"jdbc.properties的成全路径名"
        Path configPath = PathUtils.get(Dictionary.getSingleton().getDictRoot(), "jdbc.properties");
        try {
            //根据全路径名构建输入流,然后加载到mysqlConfig对象中,这样就可以从mysqlConfig对象读取配置值了
            mysqlConfig.load(new FileInputStream(configPath.toFile()));
            this.url = mysqlConfig.getProperty("jdbc.url");
            this.username = mysqlConfig.getProperty("jdbc.username");
            this.password = mysqlConfig.getProperty("jdbc.password");
        } catch (IOException e) {
            throw new IllegalStateException("加载jdbc.properties配置文件发生异常");
        }

        try {
            //加载MySQL驱动的类
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException("加载数据库驱动时发生异常");
        }
    }

    public static DictLoader getInstance() {
        return INSTANCE;
    }

    public void loadMysqlExtensionWords() {
        //每次从MySQL里加载词库时会执行一条SQL语句
        //这时就必须要有一个和MySQL之间建立的网络连接,才能发送SQL语句出去

        //由于这里会每分钟执行一次SQL语句
        //所以每次执行SQL语句的时候就创建一个数据库的网络连接Connection,执行完SQL后再把该Connection释放即可

        Connection connection = null;
        Statement statement = null;
        ResultSet resultSet = null;

        String sql;

        //第一次执行时会通过CAS操作把extensionWordFistLoad变量由false改成true,并且查全量词汇
        //之后的执行,extensionWordFistLoad变量已经变为true,所以CAS操作会不成功,于是只查增量词汇
        if (extensionWordFistLoad.compareAndSet(false, true)) {
            //首次加载会从数据库查全量的词汇
            sql = "SELECT word FROM extension_word";
        } else {
            //后面按照最近的修改时间来加载增量的词
            sql = "SELECT word FROM extension_word WHERE update_time >= '" + extensionWordLastLoadTimeRef.get() + "'";
        }

        //每次生成了加载词库的SQL后,都会去设置一个本次加载的时间
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowString = dateFormat.format(new Date());
        //设置最近一次加载词库的时间,extensionWordLastLoadTimeRef也是Atomic变量,线程安全的
        extensionWordLastLoadTimeRef.set(nowString);

        //加载扩展词词库内容
        try {
            //使用传统的JDBC编程获取连接
            connection = DriverManager.getConnection(url, username, password);
            //创建statement
            statement = connection.createStatement();
            //执行SQL语句获取结果集
            resultSet = statement.executeQuery(sql);
            LOGGER.info("从MySQL加载extensionWord, sql={}", sql);

            Set<String> extensionWords = new HashSet<>();
            while (resultSet.next()) {
                String word = resultSet.getString("word");
                if (word != null) {
                    extensionWords.add(word);
                    //为了方便看日志,可以把加载到的扩展词全都打印出来了
                    LOGGER.info("从MySQL加载extensionWord, word={}", word);
                }
            }

            //放到字典里
            Dictionary.getSingleton().addWords(extensionWords);
        } catch (Exception e) {
            LOGGER.error("从MySQL加载extensionWord发生异常", e);
        } finally {
            //把结果集resultSet、statement、连接connection都进行释放
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }
        }
    }

    public void loadMysqlStopWords() {
        Connection connection = null;
        Statement statement = null;
        ResultSet resultSet = null;

        String sql;
        if (stopWordFistLoad.compareAndSet(false, true)) {
            sql = "SELECT word FROM stop_word";
        } else {
            sql = "SELECT word FROM stop_word WHERE update_time >= '" + stopWordLastLoadTimeRef.get() + "'";
        }

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowString = dateFormat.format(new Date());
        stopWordLastLoadTimeRef.set(nowString);

        //加载词库内容
        try {
            connection = DriverManager.getConnection(url, username, password);
            statement = connection.createStatement();
            resultSet = statement.executeQuery(sql);
            LOGGER.info("从MySQL加载stopWord, sql={}", sql);

            Set<String> stopWords = new HashSet<>();
            while (resultSet.next()) {
                String word = resultSet.getString("word");
                if (word != null) {
                    stopWords.add(word);
                    LOGGER.info("从MySQL加载stopWord,word={}", word);
                }
            }
            // 放到字典里
            Dictionary.getSingleton().addStopWords(stopWords);
        } catch (Exception e) {
            LOGGER.error("从MySQL加载extensionWord发生异常", e);
        } finally {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }

            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    LOGGER.error(e);
                }
            }
        }
    }
}

(2)修改自带的Dictionary类

代码位置:

org.wltea.analyzer.dic.Dictionary#initial

public class Dictionary {
    ...
    //词典单例实例
    private static Dictionary singleton;
    private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    ...

    //词典初始化
    //由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
    //只有当Dictionary类被实际调用时才会开始载入词典,这将延长首次分词操作的时间
    //该方法提供了一个在应用加载阶段就初始化字典的手段
    public static synchronized void initial(Configuration cfg) {
        if (singleton == null) {
            synchronized (Dictionary.class) {
                if (singleton == null) {
                    singleton = new Dictionary(cfg);
                    singleton.loadMainDict();
                    singleton.loadSurnameDict();
                    singleton.loadQuantifierDict();
                    singleton.loadSuffixDict();
                    singleton.loadPrepDict();
                    singleton.loadStopWordDict();

                    //在这里开启一个线程,每隔一段时间去mysql里面加载一下词库里的内容
                    new Thread(() -> {
                        while (true) {
                            try {
                                DictLoader.getInstance().loadMysqlExtensionWords();
                                DictLoader.getInstance().loadMysqlStopWords();
                                TimeUnit.SECONDS.sleep(60);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();

                    if (cfg.isEnableRemoteDict()) {
                        //建立监控线程
                        for (String location : singleton.getRemoteExtDictionarys()) {
                            //10秒是初始延迟可以修改的,60是间隔时间,单位秒
                            pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
                        }
                        for (String location : singleton.getRemoteExtStopWordDictionarys()) {
                            pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
                        }
                    }
                }
            }
        }
    }
    ...
}

13.demo-product-es⼯程的介绍

(1)该⼯程⾥⾯有两个搜索相关的接⼝

一.全⽂搜索接⼝

二.结构化查询接⼝

(2)该工程有两个对⽤户输⼊进⾏处理的接⼝

一.输⼊内容⾃动补全接⼝

二.输⼊内容拼写纠错接⼝

(3)该工程有三个初始化数据的接⼝

一.单线程批量插⼊商品数据接⼝

二.多线程批量插⼊商品数据接⼝

三.单线程批量插⼊suggest数据接⼝

该⼯程依赖了ElasticSearch的rest⾼级客户端库:elasticsearch-rest-high-level-client,所有对ElasticSearch的操作都是通过rest⾼级客户端库来完成的。

14.demo-product-es⼯程的商品索引

商品索引⽤来存储所有的商品信息。

(1)索引结构

商品模型的字段以满⾜测试需要为主不复杂,⼀共有10个字段。商品的索引名为:demo_plan_sku_index_序号。因为需要做多次不同的测试,有的测试是使⽤不同的索引,⽽且在实现接⼝时并没有把接⼝写死,可以指定操作那个索引,所以索引后⾯加了⼀个序号。

索引的mappings如下:

PUT /demo_plan_sku_index_15 { 
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "skuId": {
                "type": "keyword"
            },
            "skuName": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "category": {
                "type": "keyword"
            },
            "basePrice": {
                "type": "integer"
            },
            "vipPrice": {
                "type": "integer"
            },
            "saleCount": {
                "type": "integer"
            },
            "commentCount": {
                "type": "integer"
            },
            "skuImgUrl": {
                "type": "keyword",
                "index": false
            },
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "updateTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
            }
        }
    } 
}

(2)数据类型说明

elasticsearch相关⽂档链接:

数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html

text数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/text.html

keyword数据类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html

数字类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

时间类型⽂档:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

(3)使⽤的数据类型说明

一.skuName商品名称

商品名称是⼀个字符串。我们要对商品名称进⾏全⽂检索,所以skuName字段使⽤了text类型。⽤analyzer指定使⽤ik_max_word分词器,这样在保存数据时商品名称会被尽可能多的分为多个词。⽤search_analyzer指定搜索时使⽤ik_smart分词器,这样尽可能做更符合⽤户期望的分词。

二.skuId商品id

商品id⼀般是⼀个long类型的数字。我们可以使⽤ElasticSearch的数字类型,但是我们使⽤的是keyword类型。因为⽂档⾥建议:如果没有要范围查询场景,且期望查询速度更快,数字类型的字段应使⽤keyword类型。对于商品id来说,正好是⽂档中所说的情况。

⽂档链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

三.category商品分类

商品分类是⼀个字符串。我们不会对商品分类做全⽂检索,⽽是对商品分类做term精准匹配的操作,所以使⽤keyword类型。

四.basePrice商品价 | vipPrice商品会员价 | saleCount商品销量 | commentCount商品评论数

这⼏个字段都是数字。对于数字类型字段,⽂档中提到应在满⾜使⽤场景要求的情况下使⽤占⽤空间更⼩的类型,这⾥我们都使⽤Integer类型。

⽂档链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html

五.skuImgUrl商品图⽚

商品图⽚是⼀个图⽚的url地址。我们不会对这个字段做任何搜索操作,也不需要索引这个字段,所以使⽤了index:false 指定了不要索引这个字段。

⽂档链接:
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html

六.createTime创建时间和updateTime修改时间

这两个字段是时间类型的字段,对应的ElasticSearch类型为date,然后使⽤了format指定了时间的格式。

15.demo-product-es⼯程的suggest索引

suggest索引⽤来存储和⽤户输⼊⾃动补全、拼写纠错、搜索推荐相关的数据的索引。这里的搜索推荐指的是:当没有⽤户要搜索的商品时推荐其他的商品。

(1)索引结构

⼀共有两个字段:word1是⽤来做⾃动补全的,word2是⽤来做拼写纠错和搜索推荐的。

索引的mapping如下:

PUT /demo_plan_sku_suggest_15
{
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1,
        "analysis": {
            "analyzer": {
                "ik_and_pinyin_analyzer": {
                    "type": "custom",
                    "tokenizer": "ik_smart",
                    "filter": "my_pinyin"
                }
            },
            "filter": {
                "my_pinyin": {
                    "type": "pinyin",
                    "keep_first_letter": true,
                    "keep_full_pinyin": true,
                    "keep_original": true,
                    "remove_duplicated_term": true
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "word1": {
                "type": "completion",
                "analyzer": "ik_and_pinyin_analyzer"
            },
            "word2": {
                "type": "text"
            }
        }
    } 
}

(2)数据类型说明

word1⽤来做⾃动补全的,ElasticSearch中有专⻔对应的completion数据类型。

https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html

在上⾯创建索引时我们⾃⼰定义了⼀个analyzer:ik_and_pinyin_analyzer,这个analyzer同时使⽤了ik分词器和pinyin分词器,这样⽤户输⼊汉字或者拼⾳的时候都能做⾃动补全。

文章转载自:东阳马生架构

原文链接:商品中心—11.商品B端搜索系统的实现文档 - 东阳马生架构 - 博客园

体验地址:JNPF快速开发平台


网站公告

今日签到

点亮在社区的每一天
去签到