使用ElasticSearch实现全文检索

发布于:2024-12-18 ⋅ 阅读:(16) ⋅ 点赞:(0)

全文检索

任务描述

  • 在获取到数据之后如何在ES中进行数据建模,以方便之后搜索接口的实现
  • 接下来,要考虑的问题是,如何实现MySQL和ES的数据同步
  • 接下来是技术实现,要如何实现基于关键词进行全文检索和对于某一条数据的查询详情
  • 在接口实现之后,前端调用后端暴露的接口来进行数据获取,并在页面进行展示

技术难点

  • 数据同步
  • ES的检索的实现
  • 精确定位MySQL表中的数据

任务目标

  • 根据关键词进行全文检索
  • 查询详情

实现过程

1. java读取Json文件,并导入MySQL数据库中

public List<Workticket> getWorkticket(){
        ObjectMapper objectMapper = new ObjectMapper();
        List<Workticket> jsonObjects = null;
        try {
            jsonObjects = objectMapper.readValue(new File("D:\\data_hanchuan\\workticket.json"), List.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return jsonObjects;
    }

​ 上述代码将json文件的数据封装成对象,然后调用MP的批量增加方法(deviceService.saveBatch(list);),将其添加到hanchuan数据库中

2. 利用Logstah完成MySQL到ES的数据同步

【注】Logstash、ES以及Kibana必须版本一致

主要参考logstash这篇博客,完成从MySQL到ES的数据同步。下面是其中的一张表的 .conf文件(几张表就对应几个conf文件)

input {
	stdin {}
	jdbc {
		type => "jdbc"
		 # 数据库连接地址
		jdbc_connection_string => "jdbc:mysql://localhost:3306/hanchuan?characterEncoding=UTF-8&autoReconnect=true&allowPublicKeyRetrieval=true"
		 # 数据库连接账号密码;
		jdbc_user => "root"
		jdbc_password => "root"
		 # MySQL依赖包路径;
		jdbc_driver_library => "D:\apply_soft\elasticsearch_all_soft\logstash-7.6.1\bin\result\mysql-connector-j-8.0.31.jar"
		 # the name of the driver class for mysql
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		 # 数据库重连尝试次数
		connection_retry_attempts => "3"
		 # 判断数据库连接是否可用,默认false不开启
		jdbc_validate_connection => "true"
		 # 数据库连接可用校验超时时间,默认3600S
		jdbc_validation_timeout => "3600"
		 # 开启分页查询(默认false不开启);
		jdbc_paging_enabled => "true"
		 # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
		jdbc_page_size => "3000"
		 # statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
		 # sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
		 # statement_filepath => "mysql/jdbc.sql"
		statement => "SELECT id,defective_appearance,leakage_type,anbiao1,subsystem,duty_group,anbiao2,defective_why,
						elimination_person,department,accept_describe,accept_group from defect where id > :sql_last_value order by id desc"

		 # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
		lowercase_column_names => false
		 # Value can be any of: fatal,error,warn,info,debug,默认info;
		sql_log_level => warn
		 #
		 # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
		record_last_run => true
		 # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
		use_column_value => true
		 # 需要记录的字段,用于增量同步,需是数据库字段
		tracking_column => "id"

		 # record_last_run上次数据存放位置;
		last_run_metadata_path => "result/defect/last_id.txt"
		 # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
		clean_run => false
		 #
		 # 同步频率(分 时 天 月 年),默认每分钟同步一次;
		schedule => "* * * * *"
	}
}
 
filter {
 
   mutate {  
   	//挑选其中的一个字段充当title字段
       rename => {"defective_appearance" => "title"}	
    //将其id值设置为”数据库表名001_id“ 方便之后查询详情接口的实现
       update => {"id" => "defect001_%{id}"}
    // 将其他字段填充到message字段当中
       add_field => {
           "message" =>["%{title};%{leakage_type};%{anbiao1};%{subsystem};%{duty_group};%{anbiao2};%{defective_why};%{elimination_person};%{department};%{accept_describe};%{accept_group};"]
		}
	//将多余字段删除,使表的结构始终呈现为{id,title,message}形式
       remove_field => ["leakage_type","anbiao1","subsystem","duty_group","anbiao2","defective_why","elimination_person","department","accept_describe","accept_group"]
	}
         
}

output {

	elasticsearch {
		# host => "192.168.1.1"
		# port => "9200"
		# 配置ES集群地址
		hosts => ["localhost:9200"]
		# 索引名字,必须小写
		index => "hanchuan001"
	}
	stdout {
		codec => json_lines
	}
}

最终我们ES中的数据结构就是下面这个样子

在这里插入图片描述

3. 开始编写功能接口

3.1 全文检索接口
@Override
    public MetaTotal searchAllHighLight(String msg, int pageNo, int pageSize) throws IOException {
        if (pageNo <= 1) {
            pageNo = 1;
        }
        SearchRequest request = new SearchRequest(resultIndex);
//      进行搜索
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(QueryBuilders.matchQuery("message", msg))
                        .should(QueryBuilders.matchQuery("title", msg));

//        sourceBuilder.size(2000);

//        分页
        sourceBuilder.from(pageNo);
        sourceBuilder.size(pageSize);


//        进行高亮设置
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        HighlightBuilder.Field field = new HighlightBuilder.Field("message")
                .preTags("<span style='color:red'>")
                .postTags("</span>");
        HighlightBuilder.Field field1 = new HighlightBuilder.Field("title")
                .preTags("<span style='color:red'>")
                .postTags("</span>");
        highlightBuilder.field(field).field(field1);

        sourceBuilder.query(boolQueryBuilder);
        sourceBuilder.highlighter(highlightBuilder);

//        加入到request中
        request.source(sourceBuilder);

        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        List<Meta> list = new ArrayList<>();
        for (SearchHit hit : response.getHits().getHits()) {
            //----进行高亮字段的替换
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            HighlightField message = highlightFields.get("message");
            HighlightField title = highlightFields.get("title");

//          未高亮之前的结果
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
//            1.找到message中出现关键字的地方进行高亮替换
            if (message != null) {
                Text[] fragments = message.fragments();
                String n_mess = "";
                for (Text text : fragments) {
                    n_mess += text;
                }
                sourceAsMap.put("message", n_mess);
            }
//            2.找到title中出现关键字的地方进行高亮替换
            if (title != null) {
                Text[] fragments = title.fragments();
                String n_title = "";
                for (Text text : fragments) {
                    n_title += text;
                }
                sourceAsMap.put("title", n_title);
            }
            //----结束----

            Meta meta = new Meta();
            try {
                BeanUtils.populate(meta, hit.getSourceAsMap());
            } catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            } catch (InvocationTargetException e) {
                throw new RuntimeException(e);
            }
            list.add(meta);
        }
        MetaTotal metas = new MetaTotal();
        metas.setList(list);
        metas.setTotal(response.getHits().getTotalHits().value);
        System.out.println(metas.getTotal() + "总记录数");
        return metas;
    }

在业务逻辑代码写好之后在控制层暴露接口

	@ResponseBody
    @GetMapping("/search/{keyword}/{pageNo}/{pageSize}")

    public Result searchByMsg(@PathVariable String keyword,
                              @PathVariable int pageNo,
                              @PathVariable int pageSize) throws IOException {
        MetaTotal metas = service.searchAllHighLight(keyword,pageNo,pageSize);

        Page<Meta> page = new Page<>(pageNo,pageSize);
        page.setRecords(metas.getList());
        page.setTotal(metas.getTotal());
        return new Result().code(200).message("查询成功").data("list",metas.getList()).data("total",metas.getTotal());
    }
3.2 查询详情

根据前端传递的id值,进行解析,找到对应的数据库表,进行详情查看。

	@RequestMapping("/details/{id}")
    @ResponseBody
    public Result look_details2(@PathVariable("id") String id, Map<String,Object> map){
        String[] str = id.split("001_");

        if (str[0].equals("defect")){

            Defect defect = defectService.getById(str[1]);
            return new Result().code(200).message("详情结果").data("details",defect);
        } else if (str[0].equals("device")) {
            Device device = deviceService.getById(str[1]);
            return new Result().code(200).message("详情结果").data("details",device);
        } else if (str[0].equals("riskcontroller")) {
            Riskcontroller riskcontroller = riskControllerService.getById(str[1]);
            return new Result().code(200).message("详情结果").data("details",riskcontroller);
        }else if (str[0].equals("security")) {
            Security security = securityService.getById(str[1]);
            return new Result().code(200).message("详情结果").data("details",security);
        }else if (str[0].equals("workticket")) {
            Workticket workticket = workticketService.getById(str[1]);
            return new Result().code(200).message("详情结果").data("details",workticket);
        }
        return new Result().code(500).message("查询失败");
    }

4. 前端调用

在这里插入图片描述
在这里插入图片描述