目录
一、前置基础知识
1、主从复制(Replication)
2、数据恢复
3、数据库热备
4、读写分离
5、存储位置及命名
二、Maxwell简介
1、简介
2、Maxwell同步数据特点
2.1.历史记录同步
2.2.断点续传
三、前期准备
1、查看网卡:
2、配置静态IP
3、设置主机名
4、配置IP与主机名映射
5、关闭防火墙
6、配置免密登录
四、JDK的安装
五、Maxwell的安装与部署
1、Maxwell的下载安装
2、目录及脚本说明
3、配置Mysql文件
4、创建Maxwell所需数据库和用户
4.1.创建元数据库
4.2.设置安全级别
4.3.创建Maxwell的数据库用户并赋权限
5、配置Maxwell
六、Maxwell的使用
1、启动Kafka集群
2、创建数据库与表
3、Maxwell启停脚本
4、启动Maxwell
5、启动Kafka消费者
6、增量数据同步
6.1 模拟生成数据
6.2 观察Kafka消费者
一、前置基础知识
MySQL 的二进制日志(Binary Log,简称 Binlog)是 MySQL 服务端非常重要的一种日志文件,用于记录数据库中所有数据变更操作 (如 INSERT、UPDATE、DELETE、DDL 等),但不包括 SELECT 和 SHOW 这类不会修改数据的操作。Binlog 在 MySQL 数据库的高可用、灾备恢复和读写分离等场景中起着至关重要的作用。
Binlog 的主要作用包括:主从复制(Replication)、数据恢复、数据库热备、读写分离
1、主从复制(Replication)
Binlog 是实现 MySQL 主从复制的基础。主库将所有的数据变更记录到 Binlog 中,从库通过读取主库的 Binlog 并在本地重放这些操作,从而实现与主库的数据同步。具体流程如下:
- Master 主库将数据变更写入 二进制日志(Binary Log)文件 。
- Slave 从库通过 I/O 线程向 Master 发送 Dump 协议请求 ,读取 Master 的 Binary Log 中的事件(Events),并将其复制到本地的 中继日志(Relay Log) 中。
- Slave 从库的 SQL 线程读取 中继日志 Relay Log 中的事件(Events),并在本地执行这些事件,从而实现数据的同步更新。
2、数据恢复
当发生误删数据或系统故障时,可以通过解析 Binlog 文件,回放特定时间段内的数据变更操作,从而实现数据的精准恢复。
3、数据库热备
在主库出现故障的情况下,可以快速切换到一个保持同步的从库,保证数据库服务的连续性,提高系统的高可用性。
4、读写分离
主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库工作效率。
5、存储位置及命名
默认情况下,Binlog 文件会被保存在 MySQL 的数据目录中,通常为 /usr/local/mysql/data
或 /var/lib/mysql/
,具体路径可以在 MySQL 配置文件 /etc/my.cnf
中配置。如图所示:
Binlog 文件是以二进制形式存在的,每当一个 Binlog 文件达到指定大小(由 max_binlog_size 控制,默认为 1GB)时,MySQL 会自动创建一个新的 Binlog 文件,并按顺序递增编号。
除了实际的 Binlog 文件外,还有一个名为 mysql-bin.index 的索引文件,它记录了当前所有的 Binlog 文件列表,方便 MySQL 管理和读取这些文件。如图所示:
如果希望自定义 Binlog 的前缀名称(即默认的 binlog 改为其他名称),需要编辑 MySQL 的配置文件 /etc/my.cnf,添加或修改如下配置项:
vi /etc/my.cnf
[mysqld]
# 设置 Binlog 文件的前缀名,比如 mysql-bin,则生成的文件为 mysql-bin.000001 等
log-bin=mysql-bin
保存后重启 MySQL 服务,新的配置才会生效。查看效果。
service mysql restart;
ll /usr/local/mysql/data
二、Maxwell简介
1、简介
Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控MySQL数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。官网地址:http://maxwells-daemon.io/
Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。
Maxwell的工作原理就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。
2、Maxwell同步数据特点
2.1.历史记录同步
例如开始未开启 binlog ,在表中增加100条数据。然后开启binlog,但binlog中未记录开启前插入的 100条数据记录。但在maxwell中有个脚本,到指定的数据库的表中进行全表扫描,然后把历史数据进行同步。
2.2.断点续传
就是在服务挂了重启后是否会从上次断开的位置继续。Maxwell会记录断点前的偏量,这个偏量记录在存储元数据的数据库文件中,因此在使用Maxwell前要创建 存储元数据的数据库。
三、前期准备
1、查看网卡:
2、配置静态IP
vi /etc/sysconfig/network-scripts/ifcfg-ens32 ---- 根据自己网卡设置。
3、设置主机名
hostnamectl --static set-hostname 主机名
例如:
hostnamectl --static set-hostname hadoop001
4、配置IP与主机名映射
vi /etc/hosts
5、关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
6、配置免密登录
传送门
四、JDK的安装
传送门
五、Maxwell的安装与部署
1、Maxwell的下载安装
1.1. 下载
https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
注:Maxwell-1.30.0及以上版本不再支持JDK1.8。
1.2 上传
使用xshell上传到指定安装路径此处是安装路径是 /opt/module
1.3 解压重命名
tar -zxvf maxwell-1.29.2.tar.gz
mv maxwell-1.29.2/ maxwell
2、目录及脚本说明
- maxwell脚本是同步binlog日志,同步后直接生成json格式。
- maxwell-bootstrap脚本是从表中同步历史记录,但同步完并不直接生成json格式,需要交给maxwell本身再进行处理。
3、配置Mysql文件
vi /etc/my.cnf
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,Maxwell要求Binlog采用Row-based模式。
binlog_format=row
#配置Maxwell监控的 Mysql数据库管理系统中的数据库名称#如果不配置则会监控 Mysql数据库管理系统下的所有数据库
#maxwell监控多个数据库,则增加即可
binlog-do-db=hwadee01#binlog-do-db=hwadee02
#binlog-do-db=hwadee03
参数说明:
server-id = 1 #将自己伪装成Mysql主从的slave
log-bin=mysql-bin #修改Mysql的日志名称
binlog_format=row #binlog类型,maxwell要求为row类型
binlog-do-db=hwadee #启用binlog的数据库,需要进行创建数据库主从复制三种方式:
Statement-based:基于语句,主机的执行的所有写操作的SQL语句(包括insert、update、delete)等全部记录到Binlog日志中,在从机slave执行相同语句。
Row-based:基于行,主机上操作影响的行记录(每次写操作后被操作行记录的变化) 写到Binlog日志,在从机将影响的行进行同步
mixed:混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based。
优缺点:
Statement-based:
优点:节省空间
缺点:有可能造成数据不一致,例如insert语句中包含now()函数。
Row-based:
优点:保持数据的绝对一致性。
缺点:占用较大空间。
4、创建Maxwell所需数据库和用户
4.1.创建元数据库
在Mysql数据库管理系统中创建一个 maxwell 库用于存储 Maxwell 的元数据。名字不要修改。
CREATE DATABASE maxwell;
4.2.设置安全级别
注意:此设置是 Mysql数据库 8.0 以下版本才需要设置
set global validate_password.length=4;
set global validate_password.policy=0;
4.3.创建Maxwell的数据库用户并赋权限
创建一个用户,专门用来进行数据同步
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'root';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';FLUSH PRIVILEGES;
5、配置Maxwell
主要配置:从哪里拿数据,谁拿,数据送到哪里
数据发送给 Kafka的主题,则需要配置Kafka的主题所在服务器地址
从哪台机器上读数据,则需配置主机IP或主机名和maxwell的数据库
谁来负责同步数据,则需配置用于同步数据的用户的用户名和密码
cd /opt/module/maxwell
cp config.properties.example config.propertiesvi config.properties
#数据送到哪里?有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
#此处选Kafka为案例,配置Kafka的主题所在服务器地址及主体
#Kafka topic,可静态配置如:topic_db,可动态配置如:%{database}_%{table}
producer=kafka
kafka.bootstrap.servers=hadoop001:9092,hadoop002:9092,hadoop003:9092
kafka_topic=topic_db# 元数据数据库相关配置
# 从哪台机器上读数据--从hadoop001上读取数据
host=hadoop001# 谁来负责同步数据?用于同步数据的用户(用户名和密码)
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true#用于表示唯一,后续同步历史记录用,不加则默认
client_id=maxwell_1
# 过滤
# 过滤掉不需要的表,如hwadee01数据库中z_log表是日志数据备份,无须采集
filter=exclude:hwadee01.z_log
# 根据当前变化数据的主键进行hash,再用hash值进行决定到哪个分区。# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key
六、Maxwell的使用
我们使用maxwell监控Mysql管理的数据库中的数据变化记录, 将变化的记录发送到Kafka对应的主题topic_db上。
1、启动Kafka集群
若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态。并创建kafka的主 topic_db (3个分区,每个分区将来对应一个消费者),具体见Kafka的安装与使用。
Kafka安装与启动用传送门
启动zookeeper集群:/usr/bin/zkall.sh start
启动kafka集群 :/usr/bin/kfall.sh start
#创建Kafka主题与分区
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create --partitions 3 --replication-factor 3 --topic topic_db
2、创建数据库与表
#创建数据库
CREATE DATABASE hwadee01;
# 创建表
DROP TABLE IF EXISTS `activity_info`; CREATE TABLE `activity_info` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '活动id',`activity_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活动名称',`activity_type` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活动类型(1:满减,2:折扣)',`activity_desc` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '活动描述',`start_time` datetime NULL DEFAULT NULL COMMENT '开始时间',`end_time` datetime NULL DEFAULT NULL COMMENT '结束时间',`create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',`operate_time` datetime NULL DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '活动表' ROW_FORMAT = DYNAMIC;/*用于历史数据全量同步使用 */ INSERT INTO `activity_info` VALUES (1, '小米手机专场', '3101', '小米手机满减2', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (2, 'CAREMiLLE口红满两个8折', '3102', 'CAREMiLLE口红满两个8折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL);
3、Maxwell启停脚本
启动:/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
停止:ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
脚本:
touch /usr/bin/mxw.sh
chmod 777 /usr/bin/mxw.sh
vi /usr/bin/mxw.sh
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result }start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "启动Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在运行"fi }stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在运行"fi }case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;; esac
4、启动Maxwell
启动 Maxwell并查看进程与maxwell数据库
启动:/usr/bin/mxw.sh start
jps
停止:/usr/bin/mxw.sh stop
5、启动Kafka消费者
cd /opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic topic_db
6、增量数据同步
6.1 模拟生成数据
INSERT INTO `activity_info` VALUES (3, '小米手机专场', '3101', '小米手机满减2', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (4, 'CAREMiLLE口红满两个8折', '3102', 'CAREMiLLE口红满两个8折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (5, '联想活动专场满减', '3101', '联想活动专场满减', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL); INSERT INTO `activity_info` VALUES (6, 'TCL全场9折', '3103', 'TCL全场9折', '2022-01-13 01:01:54', '2023-06-19 00:00:00', '2022-05-27 00:00:00', NULL);
6.2 观察Kafka消费者
7、历史数据全量同步
我们已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。
1、maxwell-bootstrap
此时就需要使用 Maxwell提供的 bootstrap 功能来进行历史数据的全量同步,使用maxwell-bootstrap,命令如下:
/opt/module/maxwell/bin/maxwell-bootstrap --database hwadee01 --table activity_info --config /opt/module/maxwell/config.properties
#注意:maxwell-bootstrap 只是把表的数据查出来,但自己本身没有将数据发送到kafka的能力,因此需要通过maxwell本身来发送,因此需要根据maxwell的client_id找到maxwell,而client_id在config.properties进行了配置。
脚本:
touch /usr/bin/mysql_to_kafka_init.sh
chmod 777 /usr/bin/mysql_to_kafka_init.sh
vi /usr/bin/mysql_to_kafka_init.sh
#!/bin/bash#该脚本的作用是初始化所有的业务数据,只需执行一次 MAXWELL_HOME=/opt/module/maxwellimport_data(){$MAXWELL_HOME/bin/maxwell-bootstrap --database hwadee01 --table $1 --config $MAXWELL_HOME/config.properties }case $1 in"activity_info" )import_data activity_info;;"sku_info" )import_data sku_info;;"all" )import_data activity_infoimport_data sku_info;; esac
历史数据进行全量同步:
指定要全量同步的表
/usr/bin/mysql_to_kafka_init.sh activity_info
2、执行后结果格式
cd /opt/module/kafka
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic topic_db
注意事项:
(1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。
(2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间。