ES 与 MySQL 数据同步:深入解析与实战拓展

发布于:2025-02-19 ⋅ 阅读:(22) ⋅ 点赞:(0)

ES 与 MySQL 数据同步:深入解析与实战拓展

一、引言

在当今数字化时代,数据如同企业的生命线,其一致性和实时性至关重要。在许多业务场景中,我们常常需要同时使用 MySQL 这样的关系型数据库来存储结构化数据,以及 Elasticsearch 这样的搜索引擎来实现高效的数据检索。以酒店预订系统为例,酒店的详细信息(如房型、价格、位置等)存储在 MySQL 中,而 Elasticsearch 则负责为用户提供快速的搜索服务,帮助用户在海量酒店数据中精准定位心仪的酒店。

为了确保用户在搜索时能获取到最新的酒店信息,当 MySQL 中的酒店数据发生变化时,Elasticsearch 中的数据也必须同步更新。这就引出了本文的核心内容——ES 与 MySQL 之间的数据同步。接下来,我们将深入探讨三种常见的数据同步方案,并详细阐述如何在实际项目中实现它们。

二、数据同步方案

(一)同步调用

  1. 原理剖析:同步调用是一种最为直接的数据同步方式。简单来说,就是当酒店管理服务对 MySQL 数据库完成操作(如新增、修改或删除酒店信息)后,立即调用另一个专门负责更新 Elasticsearch 数据的服务接口。这种方式就好比你在餐厅点完菜后,服务员直接把你的订单信息同时告知厨房和收银台,中间没有任何缓冲环节。
  2. 优点
    • 实现简单:从技术实现角度来看,它不需要复杂的架构设计和中间件支持。开发人员只需要在现有的业务逻辑中添加几行调用接口的代码,就能快速实现数据同步功能。对于一些小型项目或者对数据实时性要求极高且业务逻辑相对简单的场景来说,这种方式非常适用。
    • 数据实时性强:由于是直接调用接口,几乎不存在数据延迟的问题。只要 MySQL 数据更新完成,ES 数据就能立即同步更新,确保了搜索结果的实时准确性。
  1. 缺点
    • 业务耦合度高:酒店管理服务与更新 ES 数据的服务紧密耦合在一起。这意味着如果更新 ES 数据的服务发生任何变化(如接口地址变更、接口参数调整等),酒店管理服务也必须相应地进行修改。这种强耦合关系不仅增加了系统的维护难度,还降低了系统的可扩展性和灵活性。例如,如果未来需要更换 Elasticsearch 为其他搜索引擎,那么整个酒店管理服务的代码都可能需要进行大规模的改动。
    • 影响业务性能:同步调用会阻塞当前业务线程,直到 ES 数据更新完成。如果 ES 服务出现性能问题或者网络延迟,将会直接影响到酒店管理服务的响应速度,进而影响用户体验。想象一下,用户在预订酒店时,因为等待 ES 数据同步而导致页面长时间无响应,这无疑会极大地降低用户对系统的满意度。

(二)异步通知(常用)

  1. 原理剖析:异步通知方案引入了消息队列(MQ)作为中间件,实现了服务之间的解耦。当 hotel - admin 对 MySQL 数据库完成增、删、改操作后,它并不会直接去调用更新 ES 数据的服务,而是向 MQ 发送一条消息。这条消息就像是一张便签,上面记录了数据库操作的相关信息(如操作类型、酒店 ID 等)。而 hotel - demo 则一直在监听 MQ,当它接收到这条消息时,就会根据消息内容去更新 Elasticsearch 中的数据。这就好比你在网上下单后,商家并不会立即处理你的订单,而是先把订单信息记录下来,等仓库工作人员有空时再去处理。
  2. 优点
    • 低耦合:通过 MQ 进行通信,hotel - admin 和 hotel - demo 之间不再直接依赖对方。它们只需要遵循共同的消息格式和协议,就可以独立地进行开发、部署和维护。这种低耦合特性使得系统的可扩展性大大增强,例如,未来如果需要增加新的服务来处理其他与酒店数据相关的业务,只需要让它监听 MQ 上的相关消息即可,而不会影响到现有的酒店管理服务和 ES 数据更新服务。
    • 提高系统性能:由于异步通知不会阻塞业务线程,hotel - admin 在完成数据库操作后可以立即返回响应给用户,无需等待 ES 数据更新完成。这大大提高了系统的吞吐量和响应速度,能够更好地应对高并发的业务场景。
  1. 缺点
    • 依赖 MQ 的可靠性:如果 MQ 出现故障(如服务器宕机、消息丢失等),可能导致数据同步失败。虽然可以通过一些技术手段(如消息持久化、集群部署等)来提高 MQ 的可靠性,但这也增加了系统的复杂度和运维成本。
    • 数据一致性问题:在异步通知过程中,由于消息的发送和接收存在一定的时间差,可能会出现短暂的数据不一致情况。例如,在 hotel - admin 发送消息后,hotel - demo 还未接收到消息之前,用户进行搜索,可能会获取到旧的数据。虽然这种不一致性通常是短暂的,但在对数据一致性要求极高的场景下,可能需要采取额外的措施来解决。

(三)监听 binlog

  1. 原理剖析:binlog 是 MySQL 数据库的二进制日志文件,它记录了数据库的所有增、删、改操作。监听 binlog 方案就是通过 Canal 这样的工具,实时监控 MySQL 的 binlog 文件。当有新的操作记录写入 binlog 时,Canal 会将这些操作解析出来,并发送给 hotel - demo。hotel - demo 接收到这些操作信息后,就可以根据它们来更新 Elasticsearch 中的数据。这就好比有一个监控摄像头,实时记录着 MySQL 数据库的一举一动,一旦有新的操作发生,就立即通知相关人员进行处理。
  2. 优点
    • 完全解耦:这种方案实现了服务之间的最大程度解耦。hotel - admin 只需要专注于数据库操作,无需关心 ES 数据的更新;hotel - demo 也只需要关注从 Canal 接收到的 binlog 信息并进行相应处理,与酒店管理服务没有直接关联。这种解耦特性使得系统的各个部分可以独立演进,提高了系统的稳定性和可维护性。
    • 数据完整性高:由于 binlog 记录了数据库的所有操作,通过监听 binlog 可以确保 ES 数据与 MySQL 数据的完全一致性,不存在数据丢失或遗漏的情况。
  1. 缺点
    • 增加数据库负担:开启 binlog 功能会增加 MySQL 数据库的 I/O 开销,因为每次数据库操作都需要写入 binlog 文件。对于高并发的数据库系统来说,这可能会对性能产生一定的影响。
    • 实现复杂度高:监听 binlog 需要对 MySQL 的内部机制有深入的了解,同时还需要掌握 Canal 等工具的使用。此外,在解析 binlog 信息并将其转换为 ES 数据更新操作的过程中,也需要处理各种复杂的情况(如事务处理、数据格式转换等),这增加了开发和维护的难度。

三、实现数据同步

(一)总体步骤

  1. 部署 MQ:选择合适的 MQ 产品(如 RabbitMQ、Kafka 等),并进行单机部署。在部署过程中,需要根据实际业务需求进行相关配置,如设置主机名、端口、虚拟主机、用户名和密码等。以 RabbitMQ 为例,在安装完成后,需要通过命令行或管理界面进行这些参数的配置,确保 MQ 能够正常运行并与其他服务进行通信。
  2. 声明配置:在接收者(hotel - demo)中声明 exchange、queue、RoutingKey。这些配置信息就像是 MQ 中的地址和路由规则,exchange 负责接收生产者发送的消息,并根据 RoutingKey 将消息路由到对应的 queue 中。在声明时,需要根据业务逻辑和消息类型进行合理的规划,例如,将酒店新增和修改消息发送到一个特定的 queue,将删除消息发送到另一个 queue。
  3. 发送消息:在 hotel - admin 发送者中的增、删、改业务逻辑代码中,添加发送消息到 MQ 的代码。在发送消息时,需要构建正确的消息内容,包括消息体(如酒店 ID、操作类型等)和 RoutingKey,确保消息能够准确地被路由到对应的 queue 中。
  4. 监听更新:在 hotel - demo 接收者中编写消息监听代码,当接收到消息时,根据消息内容调用相应的业务方法来更新 Elasticsearch 中的数据。同时,需要处理可能出现的异常情况,如消息解析失败、ES 操作失败等,确保系统的稳定性和可靠性。
  5. 启动测试:完成上述步骤后,启动 hotel - admin 和 hotel - demo 服务,并使用 Postman 等工具调用 MySQL 数据库的增、删、改接口,然后在 ES 页面进行搜索,验证数据同步功能是否正常。在测试过程中,需要全面覆盖各种业务场景,包括正常情况和异常情况,确保系统在各种情况下都能正确地实现数据同步。

(二)导入依赖和 yaml

  1. 引入依赖:在项目的 pom.xml 文件中添加 spring - boot - starter - amqp 依赖,该依赖提供了与 RabbitMQ 集成的相关功能。添加依赖后,Maven 会自动下载并管理相关的 jar 包,确保项目能够顺利使用 RabbitMQ 的功能。
<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置 yaml:在 application.yml 文件中添加 RabbitMQ 的配置信息,包括主机名、端口、虚拟主机、用户名和密码等。这些配置信息将被 Spring Boot 应用读取,用于创建与 RabbitMQ 的连接。例如:
spring:
  rabbitmq:	#MQ配置
    host: 192.168.194.131 # 主机名
    port: 5672 # 端口
    virtual - host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

(三)声明交换机、队列

  1. 声明队列交换机名称:在 cn.itcast.hotel.constatnts 包下新建 MqConstants 类,定义队列和交换机的相关常量。这些常量将在整个项目中被用于标识和操作 MQ 中的各个组件,确保消息的正确路由和处理。
package cn.itcast.hotel.constatnts;

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的 RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的 RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
  1. 声明队列交换机:在 hotel - demo 消费者中定义 MqConfig 配置类,用于声明队列、交换机及其绑定关系。通过这些声明,RabbitMQ 能够知道如何将消息从 exchange 路由到对应的 queue 中。例如,insertQueueBinding 方法将 insertQueue 与 topicExchange 通过 HOTEL_INSERT_KEY 进行绑定,确保新增或修改消息能够正确地被路由到 insertQueue 中。
package cn.itcast.hotel.config;

import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    @Bean
    public Queue insertQueue() {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    @Bean
    public Queue deleteQueue() {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    @Bean
    public Binding insertQueueBinding() {
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding() {
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

(四)发送 MQ 消息

在 hotel - admin 发送者的增、删、改业务方法中,添加发送消息到 MQ 的代码。以新增酒店业务为例,当酒店信息成功插入到 MySQL 数据库后,构建包含酒店 ID 和操作类型(新增)的消息,并使用 RabbitTemplate 将消息发送到对应的 exchange 和 RoutingKey。

@Service
public class HotelAdminService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void addHotel(Hotel hotel) {
        // 将酒店信息插入到MySQL数据库
        hotelRepository.save(hotel);

        // 发送新增消息到MQ
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    // 其他增、删、改业务方法类似
}

(五)接收 MQ 消息

  1. 编写业务逻辑
    • 在 IHotelService 接口中新增 deleteById 和 insertById 方法声明,用于定义删除和插入 ES 数据的业务逻辑。
public interface IHotelService {
    void deleteById(Long id);
    void insertById(Long id);
}
- 在 HotelService 类中实现上述方法。在 deleteById 方法中,使用 Elasticsearch 的 DeleteRequest 构建删除请求,并发送到 Elasticsearch 集群;在 insertById 方法中,先根据酒店 ID 从数据库查询酒店信息,将其转换为适合 Elasticsearch 存储的文档类型,然后使用 IndexRequest 构建插入请求并发送。
@Service
public class HotelService implements IHotelService {

    @Autowired
    private RestHighLevelClient client;

    @Autowired
    private HotelRepository hotelRepository;

    @Override
    public void deleteById(Long id) {
        try {
            // 1.准备 Request
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void insertById(Long id) {
        try {
            // 0.根据 id 查询酒店数据
            Hotel hotel = hotelRepository.findById(id).orElse(null);
            if (hotel!= null) {
                // 转换为文档类型
                HotelDoc hotelDoc = new HotelDoc(hotel);

                // 1.准备 Request 对象
                IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
                // 2.准备 Json 文档
                request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
                // 3.发送请求
                client.index(request, RequestOptions.DEFAULT);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
  1. 编写监听器:在 cn.itcast.hotel.mq 包下新增 HotelListener 类,使用 @RabbitListener 注解监听 MQ 队列中的消息。当接收到新增或修改消息时,调用 insertById 方法;当接收到删除消息时,调用 deleteById 方法。
package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id 酒店 id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id) {
        hotelService.insertById(id);
    }

    /**
     * 监听酒店删除的业务
     * @param id 酒店 id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id) {
        hotelService.deleteById(id);
    }
}

(六)测试

  1. 测试工具选择:使用 Postman 作为测试工具,它提供了简洁易用的界面,方便我们发送各种类型的 HTTP 请求,模拟对 MySQL 数据库的增、删、改操作。
  2. 测试用例设计
    • 新增测试:使用 Postman 发送 POST 请求到酒店新增接口,传入正确的酒店信息,然后在 ES 页面搜索该酒店,验证酒店信息是否成功同步到 ES 中。
    • 修改测试:发送 PUT 请求到酒店修改接口,修改某酒店的部分信息,再次在 ES 页面搜索该酒店,检查修改后的信息是否同步。
    • 删除测试:发送 DELETE 请求到酒店删除接口,删除某酒店,然后在 ES 页面搜索该酒店,确认酒店信息已从 ES 中删除。
      3

网站公告

今日签到

点亮在社区的每一天
去签到