Maxwell简介、部署、原理和使用介绍

1.Maxwell概述简介

1-1.Maxwell简介

​ Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesis、RabbitMQ、Redis、Google CloudPub/Sub、文件或其它平台等等流数据处理平台

Maxwell项目官方网站:https://maxwells-daemon.io/
Maxwell项目Github官网:https://github.com/zendesk/maxwell

1-2.Maxwell输出数据格式转化介绍

mysql> update test set name = 'wang111' where id=1;
Query OK, 1 row affected (0.01 sec)

原始SQL转化为json

{
	"database": "wangtingdb",
	"table": "test",
	"type": "update",
	"ts": 1676444034,
	"xid": 2569,
	"commit": true,
	"data": {
		"id": 1,
		"name": "wang111"
	},
	"old": {
		"name": "wang"
	}
}

字段说明:

  • database # 变更数据所属的数据库

  • table # 变更数据所属的表

  • type # 数据变更类型( insert,update,delete )

  • ts # 数据变更发生的时间戳( 1676443644 -> 2023-02-15 14:47:24 )

  • xid # 事务id

  • commit # 事务提交标志,可用于重新组装事务

  • data

    • 对于insert类型,data表示插入的数据
    • 对于update类型,data表示修改之后的数据
    • 对于delete的类型,data表示删除的数据
  • old # 对于update的类型,表示修改前的数据,仅包含变更字段

1-3.Maxwell使用场景

​ Maxwell 的常见应用场景有数仓ETL的数据同步 、维护缓存、收集表级别的dml 指标、增量到搜索引擎、数据分区迁移、切库 binlog 回滚方案等等

2.Maxwell架构原理

​ Maxwell的实现原理很简单,就是将自己伪装成MySQL的Slave,并遵循Mysql主从复制的协议,从master中同步数据。

​ 实时读取Mysql数据库的二进制日志–Binlog,从中获取变更数据,再将变更数据以Json的格式发送至Kafka等等流处理平台( Kafka并非唯一输出途径 )

  • MySql二级制日志

    • 二进制日志(Binglog):就是Msyql服务端非常种要的一种日志,保存Msyql数据库的所有数据变更记录Binglog的主要作用包括主从复制和数据恢复。Maxwell的工作原理和主从复制密切相关
    [wangting@hdt-dmcp-ops05 mysql]$ pwd
    /var/lib/mysql
    [wangting@hdt-dmcp-ops05 mysql]$ sudo ls -l | grep mysql-bin
    -rw-r----- 1 mysql mysql     3040 Feb 15 15:05 mysql-bin.000001
    -rw-r----- 1 mysql mysql       19 Feb 15 13:59 mysql-bin.index
    
  • Mysql主从复制

    • Mysql主从复制:就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。
    • 主从复制的应用场景:
      • 热备:主数据库服务器故障后,可切换到从数据库继续工作。
      • 读写分离:主数据库只负责业务数据的写入操作,而多个从数据库只负责业务数据的查询工作,在读多写少场景下,可以提高数据库的工作效率
  • MySQL主从复制原理示意图

  • Mysql主从复制工作原理
    • Master主库接收到数据变更请求,完成数据变更,并将其写到二级制日志(binary log)中。
    • Slave从库向Mysql master发送dump协议,将Master主库的binary log events拷贝到从库的中继日志(relay log)中。
    • Slave从库读取并回放中继日志中的事件,将更新的数据同步到自己的数据库中。

3.Maxwell安装部署

【注意】:

Maxwell-1.30.0及以上的版本不再支持JDK1.8,而JDK1.8支持的最后一个版本为1.29.2

3-1.解压安装

# 下载安装包
[wangting@hdt-dmcp-ops05 software]$ wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
# 解压包
[wangting@hdt-dmcp-ops05 software]$ tar -xf maxwell-1.29.2.tar.gz -C /opt/module/
[wangting@hdt-dmcp-ops05 software]$ mv /opt/module/maxwell-1.29.2 /opt/module/maxwell
# 目录结构
[wangting@hdt-dmcp-ops05 maxwell]$ ll
total 84
drwxrwxr-x 2 wangting wangting  4096 Feb 15 13:54 bin
-rw-r--r-- 1 wangting wangting 25133 Jan 25  2021 config.md
-rw-r--r-- 1 wangting wangting 11970 Jan 25  2021 config.properties.example
-rw-r--r-- 1 wangting wangting 10259 Apr 23  2020 kinesis-producer-library.properties.example
drwxr-xr-x 3 wangting wangting 12288 Jan 27  2021 lib
-rw-r--r-- 1 wangting wangting   548 Apr 23  2020 LICENSE
-rw-r--r-- 1 wangting wangting   470 Jan 25  2021 log4j2.xml
-rw-r--r-- 1 wangting wangting  3328 Jan 27  2021 quickstart.md
-rw-r--r-- 1 wangting wangting  1429 Jan 27  2021 README.md

3-2.配置Maxwell的元数据库

  • 启动MySQL-Binlog功能
[wangting@hdt-dmcp-ops05 maxwell]$ sudo vim /etc/my.cnf

[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=wangtingdb
  • server-id=1

    • 数据库id
  • log-bin=mysql-bin

    • 启动Binlog,该参数的值会作为binlog的文件名前缀
  • binlog_format=row

    • binlog类型,maxwell要求为row类型
  • binlog-do-db=wangtingdb

    • 启动binlog的数据库,需根据实际情况修改配置
  • 重启MySQL下载新配置项
[wangting@hdt-dmcp-ops05 maxwell]$ sudo systemctl restart mysqld
  • 创建Maxwell所需数据库和用户
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -h172.20.12.179 -p
# 创建数据库
mysql> create database maxwell character set utf8mb4;
Query OK, 1 row affected (0.00 sec)
# 创建Maxwell用户
mysql> create user 'maxwell'@'%' identified by 'maxwell';
Query OK, 0 rows affected (0.00 sec)
# 赋予其必要权限
mysql> grant all on maxwell.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> grant select, replication client, replication slave on *.* to 'maxwell'@'%';
Query OK, 0 rows affected (0.01 sec)
# 刷新配置
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
mysql> quit;
Bye

[wangting@hdt-dmcp-ops05 maxwell]$ mysql -umaxwell -pmaxwell -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| wangtingdb         |
| maxwell            |
| mysql              |
| performance_schema |
| sys                |
+--------------------+

3-3.配置Maxwell

[wangting@hdt-dmcp-ops05 maxwell]$ mv config.properties.example config.properties
[wangting@hdt-dmcp-ops05 maxwell]$ vim config.properties
# Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
# 目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell

#MySQL相关配置
host=hdt-dmcp-ops05
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

注意:若Maxwell发送数据的目的地是kafka集群,需要首先将kafka集群启动

3-4.命令启动停止服务

  • 命令行直接使用
# 启动Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon

# 停止Maxwell
[wangting@hdt-dmcp-ops05 maxwell]$ ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
  • 启动停止脚本
[wangting@hdt-dmcp-ops05 bin]$ vim mymaxwell
#!/bin/bash

MAXWELL_HOME=/opt/module/maxwell

status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}


start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}


stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}


case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
	   sleep 1
       start_maxwell
    ;;
esac
[wangting@hdt-dmcp-ops05 bin]$ chmod +x mymaxwell
[wangting@hdt-dmcp-ops05 bin]$ mymaxwell stop
停止Maxwell
[wangting@hdt-dmcp-ops05 bin]$ mymaxwell start
启动Maxwell

4.Maxwell功能使用

4-1.增量同步数据测试

  • 在配置binlog开启的库中创建测试表
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -p123456

mysql> use wangtingdb;
Database changed

mysql> create table test(id int,name varchar(20));
Query OK, 0 rows affected (0.02 sec)

mysql> insert into test value(1,"wang");
Query OK, 1 row affected (0.01 sec)

mysql> insert into test value(2,"wang2");
Query OK, 1 row affected (0.01 sec)

mysql>
  • 观察Maxwell的日志变化
[wangting@hdt-dmcp-ops05 maxwell]$ tail -f logs/MaxwellDaemon.out
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

14:39:07,612 INFO  AppInfoParser - Kafka version : 1.0.0
14:39:07,612 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
14:39:07,626 INFO  Maxwell - Maxwell v1.29.2 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0]
14:39:07,756 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0])
14:39:07,809 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:977
14:39:07,819 INFO  BinaryLogClient - Connected to hdt-dmcp-ops05:3306 at mysql-bin.000001/977 (sid:6379, cid:36)
14:39:07,819 INFO  BinlogConnectorReplicator - Binlog connected.
14:44:48,333 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000001:1042], lastHeartbeat=0] after applying "create table test(id int,name varchar(20))" to wangtingdb, new schema id is 2
  • 打开Kafka-console消费消息
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}

将消息格式化显示便于理解:

{
	"database": "wangtingdb",
	"table": "test",
	"type": "insert",
	"ts": 1676443636,
	"xid": 1687,
	"commit": true,
	"data": {
		"id": 1,
		"name": "wang"
	}
}

{
	"database": "wangtingdb",
	"table": "test",
	"type": "insert",
	"ts": 1676443644,
	"xid": 1709,
	"commit": true,
	"data": {
		"id": 2,
		"name": "wang2"
	}
}
  • 更新MySQL数据
mysql> update test set name = 'wang111' where id=1;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

查看Kafka变化

[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
# 新增了如下更新消息
{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}  
# 格式化如下:
{
	"database": "wangtingdb",
	"table": "test",
	"type": "update",
	"ts": 1676444034,
	"xid": 2569,
	"commit": true,
	"data": {
		"id": 1,
		"name": "wang111"
	},
	"old": {
		"name": "wang"
	}
}
  • 删除MySQL数据
mysql> delete from test where id = 2;
Query OK, 1 row affected (0.00 sec)

查看Kafka变化

[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}
{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}
{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}
# 新增了如下更新消息
{"database":"wangtingdb","table":"test","type":"delete","ts":1676444127,"xid":2777,"commit":true,"data":{"id":2,"name":"wang2"}}
# 格式化如下:
{
	"database": "wangtingdb",
	"table": "test",
	"type": "delete",
	"ts": 1676444127,
	"xid": 2777,
	"commit": true,
	"data": {
		"id": 2,
		"name": "wang2"
	}
}

4-2.Maxwell全量数据同步

使用 Maxwell-bootstrap 命令

Maxwell提供了bootstrap命令功能来进行历史数据的全量同步,( 但依然前提要运行一个maxwell )

  • 创建样例数据
mysql> select * from test;
+------+---------+
| id   | name    |
+------+---------+
|    1 | wang111 |
+------+---------+
1 row in set (0.00 sec)

mysql> insert into test value(2,"wang222");
Query OK, 1 row affected (0.00 sec)

mysql> insert into test value(3,"wang333");
Query OK, 1 row affected (0.00 sec)

mysql> insert into test value(4,"wang444");
Query OK, 1 row affected (0.01 sec)

mysql> select * from test;
+------+---------+
| id   | name    |
+------+---------+
|    1 | wang111 |
|    2 | wang222 |
|    3 | wang333 |
|    4 | wang444 |
+------+---------+
4 rows in set (0.00 sec)
  • 提前开启Kafka消费消息窗口
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092

# 此时无消息,在等待消费消息中
  • 开始全量同步历史数据
# 查看Maxwell服务是否运行
[wangting@hdt-dmcp-ops05 bin]$ ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep
wangting 22431     1  0 15:06 pts/2    00:00:08 /opt/module/java/bin/java -Dfile.encoding=UTF-8 -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp :/opt/module/maxwell/bin/../lib/*:/opt/module/maxwell/bin/../lib/kafka-clients/kafka-clients-1.0.0.jar com.zendesk.maxwell.Maxwell --config /opt/module/maxwell/config.properties --daemon
# 
[wangting@hdt-dmcp-ops05 bin]$ cd /opt/module/maxwell/
[wangting@hdt-dmcp-ops05 maxwell]$ bin/maxwell-bootstrap --database wangtingdb --table test --config config.properties
connecting to jdbc:mysql://hdt-dmcp-ops05:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=false
  • 回到Kafka消费窗口查看
[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
{"database":"wangtingdb","table":"test","type":"bootstrap-start","ts":1676444947,"data":{}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":1,"name":"wang111"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":2,"name":"wang222"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":3,"name":"wang333"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":4,"name":"wang444"}}
{"database":"wangtingdb","table":"test","type":"bootstrap-complete","ts":1676444947,"data":{}}

【注意】:

  1. 虽然是4条数据但对应了6条消息
  2. 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的才是包含数据
  3. 一次bootstrap输出的所有记录的ts都是相同的,为bootstrap开始的时间1676444947 -> 2023-02-15 15:09:07
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐