Maxwell简介、部署、原理和使用介绍
Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesis、RabbitMQ、Redis、Google CloudPub/Sub、文件或其它平台等等Maxwell项目官方网站:https://maxwells-da
Maxwell简介、部署、原理和使用介绍
1.Maxwell概述简介
1-1.Maxwell简介
Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesis、RabbitMQ、Redis、Google CloudPub/Sub、文件或其它平台等等流数据处理平台
Maxwell项目官方网站:https://maxwells-daemon.io/
Maxwell项目Github官网:https://github.com/zendesk/maxwell
1-2.Maxwell输出数据格式转化介绍
mysql> update test set name = 'wang111' where id=1;
Query OK, 1 row affected (0.01 sec)
原始SQL转化为json
{
"database": "wangtingdb",
"table": "test",
"type": "update",
"ts": 1676444034,
"xid": 2569,
"commit": true,
"data": {
"id": 1,
"name": "wang111"
},
"old": {
"name": "wang"
}
}
字段说明:
-
database # 变更数据所属的数据库
-
table # 变更数据所属的表
-
type # 数据变更类型( insert,update,delete )
-
ts # 数据变更发生的时间戳( 1676443644 -> 2023-02-15 14:47:24 )
-
xid # 事务id
-
commit # 事务提交标志,可用于重新组装事务
-
data
- 对于insert类型,data表示插入的数据
- 对于update类型,data表示修改之后的数据
- 对于delete的类型,data表示删除的数据
-
old # 对于update的类型,表示修改前的数据,仅包含变更字段
1-3.Maxwell使用场景
Maxwell 的常见应用场景有数仓ETL的数据同步 、维护缓存、收集表级别的dml 指标、增量到搜索引擎、数据分区迁移、切库 binlog 回滚方案等等
2.Maxwell架构原理
Maxwell的实现原理很简单,就是将自己伪装成MySQL的Slave,并遵循Mysql主从复制的协议,从master中同步数据。
实时读取Mysql数据库的二进制日志–Binlog,从中获取变更数据,再将变更数据以Json的格式发送至Kafka等等流处理平台( Kafka并非唯一输出途径 )
-
MySql二级制日志
- 二进制日志(Binglog):就是Msyql服务端非常种要的一种日志,保存Msyql数据库的所有数据变更记录Binglog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关
[wangting@hdt-dmcp-ops05 mysql]$ pwd /var/lib/mysql [wangting@hdt-dmcp-ops05 mysql]$ sudo ls -l | grep mysql-bin -rw-r----- 1 mysql mysql 3040 Feb 15 15:05 mysql-bin.000001 -rw-r----- 1 mysql mysql 19 Feb 15 13:59 mysql-bin.index
-
Mysql主从复制
- Mysql主从复制:就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。
- 主从复制的应用场景:
- 热备:主数据库服务器故障后,可切换到从数据库继续工作。
- 读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库的工作效率
-
MySQL主从复制原理示意图
- Mysql主从复制工作原理
- Master主库接收到数据变更请求,完成数据变更,并将其写到二级制日志(binary log)中。
- Slave从库向Mysql master发送dump协议,将Master主库的binary log events拷贝到从库的中继日志(relay log)中。
- Slave从库读取并回放中继日志中的事件,将更新的数据同步到自己的数据库中。
3.Maxwell安装部署
【注意】:
Maxwell-1.30.0及以上的版本不再支持JDK1.8,而JDK1.8支持的最后一个版本为1.29.2
3-1.解压安装
# 下载安装包
[wangting@hdt-dmcp-ops05 software]$ wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
# 解压包
[wangting@hdt-dmcp-ops05 software]$ tar -xf maxwell-1.29.2.tar.gz -C /opt/module/
[wangting@hdt-dmcp-ops05 software]$ mv /opt/module/maxwell-1.29.2 /opt/module/maxwell
# 目录结构
[wangting@hdt-dmcp-ops05 maxwell]$ ll
total 84
drwxrwxr-x 2 wangting wangting 4096 Feb 15 13:54 bin
-rw-r--r-- 1 wangting wangting 25133 Jan 25 2021 config.md
-rw-r--r-- 1 wangting wangting 11970 Jan 25 2021 config.properties.example
-rw-r--r-- 1 wangting wangting 10259 Apr 23 2020 kinesis-producer-library.properties.example
drwxr-xr-x 3 wangting wangting 12288 Jan 27 2021 lib
-rw-r--r-- 1 wangting wangting 548 Apr 23 2020 LICENSE
-rw-r--r-- 1 wangting wangting 470 Jan 25 2021 log4j2.xml
-rw-r--r-- 1 wangting wangting 3328 Jan 27 2021 quickstart.md
-rw-r--r-- 1 wangting wangting 1429 Jan 27 2021 README.md
3-2.配置Maxwell的元数据库
- 启动MySQL-Binlog功能
[wangting@hdt-dmcp-ops05 maxwell]$ sudo vim /etc/my.cnf
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=wangtingdb
server-id=1
- 数据库id
log-bin=mysql-bin
- 启动Binlog,该参数的值会作为binlog的文件名前缀
binlog_format=row
- binlog类型,maxwell要求为row类型
binlog-do-db=wangtingdb
- 启动binlog的数据库,需根据实际情况修改配置
- 重启MySQL下载新配置项
[wangting@hdt-dmcp-ops05 maxwell]$ sudo systemctl restart mysqld
- 创建Maxwell所需数据库和用户
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -h172.20.12.179 -p
# 创建数据库
mysql> create database maxwell character set utf8mb4;
Query OK, 1 row affected (0.00 sec)
# 创建Maxwell用户
mysql> create user 'maxwell'@'%' identified by 'maxwell';
Query OK, 0 rows affected (0.00 sec)
# 赋予其必要权限
mysql> grant all on maxwell.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> grant select, replication client, replication slave on *.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.01 sec)
# 刷新配置
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
mysql> quit;
Bye
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -umaxwell -pmaxwell -e "show databases;"
+--------------------+
| Database |
+--------------------+
| information_schema |
| wangtingdb |
| maxwell |
| mysql |
| performance_schema |
| sys |
+--------------------+
3-3.配置Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ mv config.properties.example config.properties
[wangting@hdt-dmcp-ops05 maxwell]$ vim config.properties
# Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
# 目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell
#MySQL相关配置
host=hdt-dmcp-ops05
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
注意:若Maxwell发送数据的目的地是kafka集群,需要首先将kafka集群启动
3-4.命令启动停止服务
- 命令行直接使用
# 启动Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
# 停止Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
- 启动停止脚本
[wangting@hdt-dmcp-ops05 bin]$ vim mymaxwell
#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell
status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}
start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}
stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}
case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
sleep 1
start_maxwell
;;
esac
[wangting@hdt-dmcp-ops05 bin]$ chmod +x mymaxwell
[wangting@hdt-dmcp-ops05 bin]$ mymaxwell stop
停止Maxwell
[wangting@hdt-dmcp-ops05 bin]$ mymaxwell start
启动Maxwell
4.Maxwell功能使用
4-1.增量同步数据测试
- 在配置binlog开启的库中创建测试表
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -p123456
mysql> use wangtingdb;
Database changed
mysql> create table test(id int,name varchar(20));
Query OK, 0 rows affected (0.02 sec)
mysql> insert into test value(1,"wang");
Query OK, 1 row affected (0.01 sec)
mysql> insert into test value(2,"wang2");
Query OK, 1 row affected (0.01 sec)
mysql>
- 观察Maxwell的日志变化
[wangting@hdt-dmcp-ops05 maxwell]$ tail -f logs/MaxwellDaemon.out
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
14:39:07,612 INFO AppInfoParser - Kafka version : 1.0.0
14:39:07,612 INFO AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
14:39:07,626 INFO Maxwell - Maxwell v1.29.2 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0]
14:39:07,756 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0])
14:39:07,809 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:977
14:39:07,819 INFO BinaryLogClient - Connected to hdt-dmcp-ops05:3306 at mysql-bin.000001/977 (sid:6379, cid:36)
14:39:07,819 INFO BinlogConnectorReplicator - Binlog connected.
14:44:48,333 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000001:1042], lastHeartbeat=0] after applying "create table test(id int,name varchar(20))" to wangtingdb, new schema id is 2
- 打开Kafka-console消费消息
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
将消息格式化显示便于理解:
{
"database": "wangtingdb",
"table": "test",
"type": "insert",
"ts": 1676443636,
"xid": 1687,
"commit": true,
"data": {
"id": 1,
"name": "wang"
}
}
{
"database": "wangtingdb",
"table": "test",
"type": "insert",
"ts": 1676443644,
"xid": 1709,
"commit": true,
"data": {
"id": 2,
"name": "wang2"
}
}
- 更新MySQL数据
mysql> update test set name = 'wang111' where id=1;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
查看Kafka变化
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
# 新增了如下更新消息
{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}
# 格式化如下:
{
"database": "wangtingdb",
"table": "test",
"type": "update",
"ts": 1676444034,
"xid": 2569,
"commit": true,
"data": {
"id": 1,
"name": "wang111"
},
"old": {
"name": "wang"
}
}
- 删除MySQL数据
mysql> delete from test where id = 2;
Query OK, 1 row affected (0.00 sec)
查看Kafka变化
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}
# 新增了如下更新消息
{"database":"wangtingdb","table":"test","type":"delete","ts":1676444127,"xid":2777,"commit":true,"data":{"id":2,"name":"wang2"}}
# 格式化如下:
{
"database": "wangtingdb",
"table": "test",
"type": "delete",
"ts": 1676444127,
"xid": 2777,
"commit": true,
"data": {
"id": 2,
"name": "wang2"
}
}
4-2.Maxwell全量数据同步
使用 Maxwell-bootstrap 命令
Maxwell提供了bootstrap命令功能来进行历史数据的全量同步,( 但依然前提要运行一个maxwell )
- 创建样例数据
mysql> select * from test;
+------+---------+
| id | name |
+------+---------+
| 1 | wang111 |
+------+---------+
1 row in set (0.00 sec)
mysql> insert into test value(2,"wang222");
Query OK, 1 row affected (0.00 sec)
mysql> insert into test value(3,"wang333");
Query OK, 1 row affected (0.00 sec)
mysql> insert into test value(4,"wang444");
Query OK, 1 row affected (0.01 sec)
mysql> select * from test;
+------+---------+
| id | name |
+------+---------+
| 1 | wang111 |
| 2 | wang222 |
| 3 | wang333 |
| 4 | wang444 |
+------+---------+
4 rows in set (0.00 sec)
- 提前开启Kafka消费消息窗口
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
# 此时无消息,在等待消费消息中
- 开始全量同步历史数据
# 查看Maxwell服务是否运行
[wangting@hdt-dmcp-ops05 bin]$ ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep
wangting 22431 1 0 15:06 pts/2 00:00:08 /opt/module/java/bin/java -Dfile.encoding=UTF-8 -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp :/opt/module/maxwell/bin/../lib/*:/opt/module/maxwell/bin/../lib/kafka-clients/kafka-clients-1.0.0.jar com.zendesk.maxwell.Maxwell --config /opt/module/maxwell/config.properties --daemon
#
[wangting@hdt-dmcp-ops05 bin]$ cd /opt/module/maxwell/
[wangting@hdt-dmcp-ops05 maxwell]$ bin/maxwell-bootstrap --database wangtingdb --table test --config config.properties
connecting to jdbc:mysql://hdt-dmcp-ops05:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
- 回到Kafka消费窗口查看
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"bootstrap-start","ts":1676444947,"data":{}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":1,"name":"wang111"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":2,"name":"wang222"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":3,"name":"wang333"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":4,"name":"wang444"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-complete","ts":1676444947,"data":{}}
【注意】:
- 虽然是4条数据但对应了6条消息
- 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的才是包含数据
- 一次bootstrap输出的所有记录的ts都是相同的,为bootstrap开始的时间1676444947 -> 2023-02-15 15:09:07
更多推荐
所有评论(0)