目录
一、前置基础知识
MySQL 的二进制日志(Binary Log,简称 Binlog)是 MySQL 服务端非常重要的一种日志文件,用于记录数据库中所有数据变更操作 (如 INSERT、UPDATE、DELETE、DDL 等),但不包括 SELECT 和 SHOW 这类不会修改数据的操作。Binlog 在 MySQL 数据库的高可用、灾备恢复和读写分离等场景中起着至关重要的作用。
Binlog 的主要作用包括:主从复制(Replication)、数据恢复、数据库热备、读写分离
1、主从复制(Replication)
Binlog 是实现 MySQL 主从复制的基础。主库将所有的数据变更记录到 Binlog 中,从库通过读取主库的 Binlog 并在本地重放这些操作,从而实现与主库的数据同步。具体流程如下:
- Master 主库将数据变更写入 二进制日志(Binary Log)文件 。
- Slave 从库通过 I/O 线程向 Master 发送 Dump 协议请求 ,读取 Master 的 Binary Log 中的事件(Events),并将其复制到本地的 中继日志(Relay Log) 中。
- Slave 从库的 SQL 线程读取 中继日志 Relay Log 中的事件(Events),并在本地执行这些事件,从而实现数据的同步更新。
2、数据恢复
当发生误删数据或系统故障时,可以通过解析 Binlog 文件,回放特定时间段内的数据变更操作,从而实现数据的精准恢复。
3、数据库热备
在主库出现故障的情况下,可以快速切换到一个保持同步的从库,保证数据库服务的连续性,提高系统的高可用性。
4、读写分离
主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。
5、存储位置及命名
默认情况下,Binlog 文件会被保存在 MySQL 的数据目录中,通常为 /usr/local/mysql/data
或 /var/lib/mysql/
,具体路径可以在 MySQL 配置文件 /etc/my.cnf
中配置。如图所示:
Binlog 文件是以二进制形式存在的,每当一个 Binlog 文件达到指定大小(由 max_binlog_size 控制,默认为 1GB)时,MySQL 会自动创建一个新的 Binlog 文件,并按顺序递增编号。
除了实际的 Binlog 文件外,还有一个名为 mysql-bin.index 的索引文件,它记录了当前所有的 Binlog 文件列表,方便 MySQL 管理和读取这些文件。如图所示:
如果希望自定义 Binlog 的前缀名称(即默认的 binlog 改为其他名称),需要编辑 MySQL 的配置文件 /etc/my.cnf,添加或修改如下配置项:
vi /etc/my.cnf
[mysqld]
# 设置 Binlog 文件的前缀名,比如 mysql-bin,则生成的文件为 mysql-bin.000001 等
log-bin=mysql-bin
保存后重启 MySQL 服务,新的配置才会生效。查看效果。
service mysql restart;
ll /usr/local/mysql/data
二、Maxwell简介
1、简介
Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控MySQL数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。官网地址:http://maxwells-daemon.io/
Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。
Maxwell的工作原理就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。
2、Maxwell同步数据特点
2.1.历史记录同步
例如开始未开启 binlog ,在表中增加100条数据。然后开启binlog,但binlog中未记录开启前插入的 100条数据记录。但在maxwell中有个脚本,到指定的数据库的表中进行全表扫描,然后把历史数据进行同步。
2.2.断点续传
就是在服务挂了重启后是否会从上次断开的位置继续。Maxwell会记录断点前的偏量,这个偏量记录在存储元数据的数据库文件中,因此在使用Maxwell前要创建 存储元数据的数据库。
三、前期准备
1、查看网卡:
2、配置静态IP
vi /etc/sysconfig/network-scripts/ifcfg-ens32 ---- 根据自己网卡设置。
3、设置主机名
hostnamectl --static set-hostname 主机名
例如:
hostnamectl --static set-hostname hadoop001
4、配置IP与主机名映射
vi /etc/hosts
5、关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
6、配置免密登录
四、JDK的安装
五、Maxwell的安装与部署
1、Maxwell的下载安装
1.1. 下载
https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
注:Maxwell-1.30.0及以上版本不再支持JDK1.8。
1.2 上传
使用xshell上传到指定安装路径此处是安装路径是 /opt/module
1.3 解压重命名
tar -zxvf maxwell-1.29.2.tar.gz
mv maxwell-1.29.2/ maxwell
2、目录及脚本说明
- maxwell脚本是同步binlog日志,同步后直接生成json格式。
- maxwell-bootstrap脚本是从表中同步历史记录,但同步完并不直接生成json格式,需要交给maxwell本身再进行处理。
3、配置Mysql文件
vi /etc/my.cnf
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,Maxwell要求Binlog采用Row-based模式。
binlog_format=row
#配置Maxwell监控的 Mysql数据库管理系统中的数据库名称#如果不配置则会监控 Mysql数据库管理系统下的所有数据库
#maxwell监控多个数据库,则增加即可
binlog-do-db=hwadee01#binlog-do-db=hwadee02
#binlog-do-db=hwadee03
参数说明:
server-id = 1 #将自己伪装成Mysql主从的slave
log-bin=mysql-bin #修改Mysql的日志名称
binlog_format=row #binlog类型,maxwell要求为row类型
binlog-do-db=hwadee #启用binlog的数据库,需要进行创建数据库主从复制三种方式:
Statement-based:基于语句,主机的执行的所有写操作的SQL语句(包括insert、update、delete)等全部记录到Binlog日志中,在从机slave执行相同语句。
Row-based:基于行,主机上操作影响的行记录(每次写操作后被操作行记录的变化) 写到Binlog日志,在从机将影响的行进行同步
mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。
优缺点:
Statement-based:
优点:节省空间
缺点:有可能造成数据不一致,例如insert语句中包含now()函数。
Row-based:
优点:保持数据的绝对一致性。
缺点:占用较大空间。
4、创建Maxwell所需数据库和用户
4.1.创建元数据库
在Mysql数据库管理系统中创建一个 maxwell 库用于存储 Maxwell 的元数据。名字不要修改。
CREATE DATABASE maxwell;
4.2.设置安全级别
注意:此设置是 Mysql数据库 8.0 以下版本才需要设置
set global validate_password.length=4;
set global validate_password.policy=0;
4.3.创建Maxwell的数据库用户并赋权限
创建一个用户,专门用来进行数据同步
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'root';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';FLUSH PRIVILEGES;
5、配置Maxwell
主要配置:从哪里拿数据,谁拿,数据送到哪里
数据发送给 Kafka的主题,则需要配置Kafka的主题所在服务器地址
从哪台机器上读数据,则需配置主机IP或主机名和maxwell的数据库
谁来负责同步数据,则需配置用于同步数据的用户的用户名和密码
cd /opt/module/maxwell
cp config.properties.example config.propertiesvi config.properties
#数据送到哪里?有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
#此处选Kafka为案例,配置Kafka的主题所在服务器地址及主体
#Kafka topic,可静态配置如:topic_db,可动态配置如:%{database}_%{table}
producer=kafka
kafka.bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
kafka_topic=topic_db# 元数据数据库相关配置
# 从哪台机器上读数据--从hadoop001上读取数据
host=hadoop001# 谁来负责同步数据?用于同步数据的用户(用户名和密码)
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true#用于表示唯一,后续同步历史记录用,不加则默认
client_id=maxwell_1
# 过滤
# 过滤掉不需要的表,如hwadee01数据库中z_log表是日志数据备份,无须采集
filter=exclude:hwadee01.z_log
# 根据当前变化数据的主键进行hash,再用hash值进行决定到哪个分区。# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key
六、Maxwell的使用
我们使用maxwell监控Mysql管理的数据库中的数据变化记录, 将变化的记录发送到Kafka对应的主题topic_db上。
1、启动Kafka集群
若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。并创建kafka的主 topic_db (3个分区,每个分区将来对应一个消费者),具体见Kafka的安装与使用。
启动zookeeper集群:/usr/bin/zkall.sh start
启动kafka集群 :/usr/bin/kfall.sh start
#创建Kafka主题与分区
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create --partitions 3 --replication-factor 3 --topic topic_db
2、创建数据库与表
#创建数据库
CREATE DATABASE hwadee01;
# 创建表
DROP TABLE IF EXISTS `activity_info`; CREATE TABLE `activity_info` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT '活动id', `activity_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活动名称', `activity_type` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活动类型(1:满减,2:折扣)', `activity_desc` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活动描述', `start_time` datetime NULL DEFAULT NULL COMMENT '开始时间', `end_time` datetime NULL DEFAULT NULL COMMENT '结束时间', `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间', `operate_time` datetime NULL DEFAULT NULL COMMENT '修改时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '活动表' ROW_FORMAT = DYNAMIC; /* 用于历史数据全量同步使用 */ INSERT INTO `activity_info` VALUES (1, '小米手机专场', '3101', '小米手机满减2', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (2, 'CAREMiLLE口红满两个8折', '3102', 'CAREMiLLE口红满两个8折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL);
3、Maxwell启停脚本
启动:/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
停止:ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
脚本:
touch /usr/bin/mxw.sh
chmod 777 /usr/bin/mxw.sh
vi /usr/bin/mxw.sh
#!/bin/bash MAXWELL_HOME=/opt/module/maxwell status_maxwell(){ result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l` return $result } start_maxwell(){ status_maxwell if [[ $? -lt 1 ]]; then echo "启动Maxwell" $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon else echo "Maxwell正在运行" fi } stop_maxwell(){ status_maxwell if [[ $? -gt 0 ]]; then echo "停止Maxwell" ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9 else echo "Maxwell未在运行" fi } case $1 in start ) start_maxwell ;; stop ) stop_maxwell ;; restart ) stop_maxwell start_maxwell ;; esac
4、启动Maxwell
启动 Maxwell并查看进程与maxwell数据库
启动:/usr/bin/mxw.sh start
jps
停止:/usr/bin/mxw.sh stop
5、启动Kafka消费者
cd /opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic topic_db
6、增量数据同步
6.1 模拟生成数据
INSERT INTO `activity_info` VALUES (3, '小米手机专场', '3101', '小米手机满减2', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (4, 'CAREMiLLE口红满两个8折', '3102', 'CAREMiLLE口红满两个8折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (5, '联想活动专场满减', '3101', '联想活动专场满减', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (6, 'TCL全场9折', '3103', 'TCL全场9折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL);
6.2 观察Kafka消费者
7、历史数据全量同步
我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。
1、maxwell-bootstrap
此时就需要使用 Maxwell提供的 bootstrap 功能来进行历史数据的全量同步,使用maxwell-bootstrap,命令如下:
/opt/module/maxwell/bin/maxwell-bootstrap --database hwadee01 --table activity_info --config /opt/module/maxwell/config.properties
#注意:maxwell-bootstrap 只是把表的数据查出来,但自己本身没有将数据发送到kafka的能力,因此需要通过maxwell本身来发送,因此需要根据maxwell的client_id找到maxwell,而client_id在config.properties进行了配置。
脚本:
touch /usr/bin/mysql_to_kafka_init.sh
chmod 777 /usr/bin/mysql_to_kafka_init.sh
vi /usr/bin/mysql_to_kafka_init.sh
#!/bin/bash #该脚本的作用是初始化所有的业务数据,只需执行一次 MAXWELL_HOME=/opt/module/maxwell import_data(){ $MAXWELL_HOME/bin/maxwell-bootstrap --database hwadee01 --table $1 --config $MAXWELL_HOME/config.properties } case $1 in "activity_info" ) import_data activity_info ;; "sku_info" ) import_data sku_info ;; "all" ) import_data activity_info import_data sku_info ;; esac
历史数据进行全量同步:
指定要全量同步的表
/usr/bin/mysql_to_kafka_init.sh activity_info
2、执行后结果格式
cd /opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic topic_db
注意事项:
(1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
(2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。