ElasticSearch与MySQL如何进行数据同步?

发布于:2024-10-17 ⋅ 阅读:(8) ⋅ 点赞:(0)

ElasticSearch(ES)与MySQL进行数据同步的需求在实际开发中非常常见,尤其是在需要高效的全文搜索或者数据分析时,通常使用MySQL作为事务数据库,ES作为搜索和查询引擎。那么要实现MySQL与ElasticSearch的数据同步,可以采取多种方式。

常见的同步方式

  1. 手动同步
    • 在每次对MySQL进行增删改操作时,手动将数据更新到ElasticSearch。这种方法适用于小型项目,但在数据量大和频繁更新的场景下不太适用。
  2. 定时同步(全量/增量同步)
    • 定期从MySQL拉取数据(全量或增量),然后将数据同步到ElasticSearch中。例如,使用定时任务每隔一段时间执行同步。
  3. 使用数据库的增量日志(Binlog)进行同步
    • 通过捕获MySQL的Binlog(增量日志),当MySQL的数据发生变化时,实时同步到ElasticSearch。这种方式更加实时,且不需要定时全量更新。

具体的实现方案

方案一:基于消息队列的同步方案
  1. 数据写入MySQL时,发送同步消息到消息队列

    • 当应用向MySQL写入数据时,同时将数据变动的消息发送到消息队列(如RabbitMQ、Kafka等)。
  2. 消费者监听消息并同步数据到ElasticSearch

    • 消费者监听消息队列的变动消息,将数据同步到ElasticSearch。

优点

  • 保证实时性。
  • 能够处理高并发。

实现步骤

  • 应用在插入、更新或删除数据时,发送操作类型(如CREATEUPDATEDELETE)和数据内容到消息队列。
  • 消息消费者从队列中读取消息,根据操作类型将数据插入、更新或删除到ElasticSearch中。

示例代码

1. MySQL插入操作发送消息

// 保存数据到MySQL
orderMapper.insert(order);

// 发送消息到消息队列(以RabbitMQ为例)
rabbitTemplate.convertAndSend("orderSyncQueue", order);

2. 消费者同步数据到ElasticSearch

@RabbitListener(queues = "orderSyncQueue")
public void syncOrderToES(Order order) {
    // 判断操作类型,插入或更新ES中的数据
    IndexRequest indexRequest = new IndexRequest("orders").id(order.getId().toString()).source(order);
    elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
}
zhuy
方案二:基于Binlog的实时同步方案

MySQL的Binlog记录了所有的增删改操作,通过解析这些日志,可以实时获取数据变动情况,并同步到ElasticSearch中。

1. 使用Canal进行同步

Canal 是阿里巴巴开源的一个MySQL binlog增量订阅&消费组件,可以用于实时地捕获MySQL的Binlog并同步数据到ElasticSearch。

步骤

  1. 启动MySQL的Binlog功能

    • 在MySQL中开启Binlog功能,并配置server_id(唯一标识),确保MySQL能够产生Binlog。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

      2. 安装并配置Canal

        安装Canal,配置MySQL的连接信息。

        配置Canal去监听MySQL的表,捕获到数据变动时,获取Binlog日志并解析出增删改的操作。

     3.编写消费者逻辑

        当Canal捕获到数据变化时,将相应的数据同步到ElasticSearch。

示例代码:

配置Canal监听MySQL

canal:
  instance:
    dbUsername: root
    dbPassword: password
    dbHost: localhost
    dbPort: 3306
    dbName: order_db
    table: orders

捕获MySQL的Binlog变化

@EventListener
public void onOrderBinlogChange(CanalEntry.Entry entry) {
    List<CanalEntry.RowData> rowDatasList = entry.getRowChange().getRowDatasList();
    for (CanalEntry.RowData rowData : rowDatasList) {
        if (entry.getEventType() == CanalEntry.EventType.INSERT) {
            // 插入操作
            syncInsertToES(rowData.getAfterColumnsList());
        } else if (entry.getEventType() == CanalEntry.EventType.UPDATE) {
            // 更新操作
            syncUpdateToES(rowData.getAfterColumnsList());
        } else if (entry.getEventType() == CanalEntry.EventType.DELETE) {
            // 删除操作
            syncDeleteFromES(rowData.getBeforeColumnsList());
        }
    }
}

将数据同步到ElasticSearch

private void syncInsertToES(List<CanalEntry.Column> columns) {
    // 将MySQL数据转换成ES文档格式,并插入ElasticSearch
    IndexRequest indexRequest = new IndexRequest("orders").id(getColumnValue(columns, "id")).source(columnsToMap(columns));
    elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
}

private void syncUpdateToES(List<CanalEntry.Column> columns) {
    // 更新ElasticSearch中的数据
    UpdateRequest updateRequest = new UpdateRequest("orders", getColumnValue(columns, "id")).doc(columnsToMap(columns));
    elasticsearchClient.update(updateRequest, RequestOptions.DEFAULT);
}

private void syncDeleteFromES(List<CanalEntry.Column> columns) {
    // 删除ElasticSearch中的数据
    DeleteRequest deleteRequest = new DeleteRequest("orders", getColumnValue(columns, "id"));
    elasticsearchClient.delete(deleteRequest, RequestOptions.DEFAULT);
}

注意:

在使用消息中间件(如RabbitMQ、Kafka)实现数据同步时,消息的发送是主动的,由应用程序或服务在执行增删改操作时,主动将消息发送到消息队列。而消息队列本身并不具备监听数据库变化的功能,它的角色是用来存储和传递消息,消息的生产和消费逻辑需要在应用程序中实现。

消息发送的流程:

  1. 生产者(业务逻辑层)主动发送消息: 当应用程序执行数据库的增、删、改操作时,需要主动地将这些操作的信息发送到消息队列中。这通常是在业务代码中,在操作数据库的同时添加发送消息的逻辑。例如,新增一条记录后,会主动发送一个"新增"的消息到队列中。

  2. 消息队列(MQ)接收消息: 消息队列(如RabbitMQ、Kafka)会接收生产者发送的消息,将消息存储在队列中,并根据配置将消息推送给消费者或等待消费者主动拉取。

  3. 消费者监听队列并处理消息: 消费者服务通过监听指定的队列来接收消息,接收到消息后,消费者根据消息类型(新增、修改、删除)来执行相应的操作,比如同步到ElasticSearch或进行其他数据处理操作。

2. 使用Debezium进行同步

Debezium 是一个开源的CDC(Change Data Capture)平台,也可以实时监听MySQL的变化并将数据同步到其他存储系统,包括ElasticSearch。

步骤

        1.安装并配置Debezium连接MySQL。

        2.配置监听的表以及变动捕获逻辑。

        3.实现数据同步逻辑,将数据变动同步到ElasticSearch。

总结

  1. 消息队列同步方案:适用于数据操作频繁的场景,能够保证高并发时的系统稳定性和实时性,常用RabbitMQ或Kafka等消息队列实现。

  2. Binlog同步方案:基于Canal或Debezium的同步可以实现更为实时的同步,能够捕获数据库级别的所有数据变化。Binlog方式不依赖应用层代码改动,适合于对MySQL增删改同步要求较高的场景。

  3. 定时同步方案:适用于不需要实时同步的场景,通过定时任务进行批量同步

不同方案各有优缺点,根据具体项目需求选择合适的同步方式。