ES与关系数据库的同步练习(hotel_admin)

发布于:2024-05-06 ⋅ 阅读:(41) ⋅ 点赞:(0)

1 es与数据库同步的方法

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知(选择这个折中下)

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

2 实践

2.1 任务介绍

当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
同时开启了hotel_admin和hotel_demo两个微服务,利用MQ声明exchange、queue、RoutingKey,在hotel-admin中的增、删、改业务中完成消息发送,在hotel-demo中完成消息监听,并更新elasticsearch中数据,进而完成es和mysql的消息同步

2.2 MQ方面操作

2.2.1 声明交换机队列并且绑定

我打算使用的mq结构如下:
在这里插入图片描述
代码:

@Configuration
public class Myconfig {

    /**
     * 声明交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
    }

    /**
     * 插入/更新队列
     * @return
     */
    @Bean
    public Queue insertQueue(){
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }

    /**
     * 删除队列
     * @return
     */
    @Bean
    public Queue deleteQueue(){
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE);
    }

    /**
     * 绑定增/改
     * @return
     */
    @Bean
    public Binding bindingInsert(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_QUEUE);
    }

    /**
     * 绑定删除
     * @return
     */
    @Bean
    public Binding bindingDelete(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

2.2.2 hotel_admin端web层设置mq发送消息

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
    }
}

2.3 hotel_demo端监听接受消息并执行es操作

@Component
public class MsgListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听插入或者更新doc的信息
     * @param id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void receiveInsertMsg(Long id){
        hotelService.InsertOrUpdate(id);
    }

    /**
     * 监听删除doc的信息
     * @param id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void receiveDeleteMsg(Long id){
        hotelService.deleteEsById(id);
    }
}

之后去service层实现监听类调用的增删方法

    @Override
    public void InsertOrUpdate(Long id) {
        //1 根据id去数据库查信息
        Hotel db_hotel = this.getById(id);
        if(db_hotel == null){
            log.warn("id为:"+id+"的酒店不存在");
            return;
        }
        //2 构建添加对象
        HotelDoc hotelDoc = new HotelDoc(db_hotel);
        String jsonString = JSON.toJSONString(hotelDoc);
        IndexRequest request = new IndexRequest("hotel").id(db_hotel.getId().toString());
        request.source(jsonString, XContentType.JSON);
        //3 发送添加请求
        try {
            IndexResponse result = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.warn("同步id为:"+id+"的信息超时");
        }
    }

    @Override
    public void deleteEsById(Long id) {
        DeleteRequest request = new DeleteRequest("hotel",id.toString());
        try {
            restHighLevelClient.delete(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.warn("删除id为:"+id+"的信息超时");
        }
    }