大数据高级开发工程师——大数据相关工具之三 Maxwell
文章目录大数据相关工具Maxwell数据实时同步工具Maxwell 简介MySQL Binlog 介绍1. Binlog简介2. Binlog的日志格式Mysql 实时数据同步方案对比开启MySQL的BinlogMaxwell 安装部署Maxwell 实时采集案例大数据相关工具Maxwell数据实时同步工具Maxwell 简介Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog
·
文章目录
大数据相关工具
Maxwell数据实时同步工具
Maxwell 简介
- Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的 DML 指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
- 官网地址:http://maxwells-daemon.io
- GitHub地址:https://github.com/zendesk/maxwell
- Maxwell 主要功能:
- 支持
select * from table
方式进行全量数据初始化; - 支持在主库发生 failover 后,自动恢复 binlog 位置,实现断点续传;
- 可以对数据进行分区,解决数据倾斜问题,发送到 Kafka 的数据支持库、表、列等级别的数据分区;
- 工作方式是伪装为 slave 接收 binlog events,然后根据 schema 信息拼装,可以接受 ddl、xid、row 等event。
- 支持
Maxwell工作原理
- Maxwell工作原理与Canal工作原理一样,都是把自己伪装成MySQL 的slave从库,同步binlog数据,来达到同步MySQL数据,与Canal相比,更加轻量。同样使用Maxwell也需要开启MySQL binlog日志。
MySQL Binlog 介绍
1. Binlog简介
- MySQL 中有以下几种日志:
- 错误日志:记录在启动,运行或停止mysqld时遇到的问题
- 通用日志:记录建立的客户端连接和执行的语句
- 二进制日志(bin log):记录更改数据的语句
- 中继日志:从服务器 复制 主服务器接收的数据更改
- 慢查询日志:记录所有执行时间超过
long_query_time
秒的所有查询或不使用索引的查询 - DDL 日志(元数据日志):元数据操作由 DDL 语句执行
- 在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件。
- MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的
DDL
和DML
语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复。- MySQL主从复制:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给slaves 来达到 master-slave 数据一致的目的。
- 数据恢复:通过使用 mysqlbinlog 工具来使恢复数据。
2. Binlog的日志格式
- 记录在二进制日志中的事件的格式取决于二进制记录格式,支持三种格式类型:
- Statement:基于 SQL 语句的复制(statement-based replication, SBR)
- Row:基于行的复制(row-based replication, RBR)
- Mixed:混合模式复制(mixed-based replication, MBR)
- Statement:
- 每一条会修改数据的sql都会记录在binlog中。
- 优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO,提高了性能。
- 缺点:在进行数据同步的过程中有可能出现数据不一致,比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
- Row:
- 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。
- 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。
- 缺点:每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。
- Mixed:
- 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的操作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。
- 优点:节省空间,同时兼顾了一定的一致性。
- 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。
Mysql 实时数据同步方案对比
- mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍:
对比项 | Canal | Maxwell | mysql_streamer |
---|---|---|---|
开源方 | 阿里巴巴 | zendesk | Yelp |
语言 | Java | Java | Python |
活跃度 | 活跃 | 活跃 | 活跃 |
HA | 支持 | 定制 | 支持 |
数据落地 | 定制 | Kafka等 | Kafka |
分区 | 支持 | 支持 | 不支持 |
bootstrap | 不支持 | 支持 | 支持 |
数据格式 | 格式自由 | json(固定) | json(固定) |
文档 | 较详细 | 较详细 | 粗略 |
随机读 | 支持 | 支持 | 支持 |
- 其中
canal
由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费 canal 解析到的数据。 - Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适。
- 另外
Maxwell
有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap
功能,可以直接引导出完整的历史数据用于初始化,非常好用。
开启MySQL的Binlog
- mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
- 添加mysql普通用户
maxwell
,因为maxwell这个软件默认用户使用的是maxwell这个用户。
-- 校验级别最低,只校验密码长度
mysql> set global validate_password_policy=LOW;
mysql> set global validate_password_length=6;
-- 创建maxwell库(启动时候会自动创建,不需手动创建)和用户
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
-- 刷新权限
mysql> flush privileges;
- 修改配置文件
sudo vim /etc/my.cnf
,添加或修改以下三行配置:
# binlog日志名称前缀
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# 唯一标识,这个值的区间是:1到(2^32)-1
server_id=1
- 重启 MySQL 服务
sudo service mysqld restart
- 验证 binlog 是否配置成功
show variables like '%log_bin%';
- 查看binlog日志文件生成:进入
/var/lib/mysql
目录,查看binlog日志文件
Maxwell 安装部署
- 安装包下载&解压:
wget https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
tar -zxvf maxwell-1.21.1.tar.gz -C /bigdata/install
- 修改maxwell配置文件:
cd /bigdata/install/maxwell-1.21.1/
cp config.properties.example config.properties
vim config.properties
- 配置文件
config.properties
内容如下:
# choose where to produce data to
producer=kafka
# list of kafka brokers
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
# mysql login info
host=node03
port=3306
user=maxwell
password=123456
# kafka topic to write to
kafka_topic=maxwell
Maxwell 实时采集案例
- 关于 kafka 的安装和使用,请参考我之前的博文:JavaEE 企业级分布式高级架构师(十二)Kafka学习笔记
- 启动kafka集群和zookeeper集群
zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties
- 创建 topic
# /usr/apps/kafka
bin/kafka-topics.sh --create --bootstrap-server 192.168.254.128:9092 --replication-factor 2 --partitions 3 --topic maxwell
# 查看topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.254.128:9092
- node01 启动 kafka 自带控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.254.128:9092 --topic maxwell --from-beginning
- node03 启动maxwell服务
maxwell
- 插入数据进行测试,向mysql表中插入一条数据,查看kafka是否能够接收到数据
CREATE DATABASE /*!32312 IF NOT EXISTS*/`test_db` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `test_db`;
/*Table structure for table `user` */
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` varchar(10) NOT NULL,
`name` varchar(10) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*Data for the table `user` */
-- 插入数据
insert into `user`(`id`,`name`,`age`) values ('1','xiaokai',20);
-- 修改数据
update `user` set age= 30 where id='1';
-- 删除数据
delete from `user` where id='1';
- 输出 json 数据字段说明:
字段 | 说明 |
---|---|
database | 数据库名称 |
table | 表名称 |
type | 操作类型,包括 insert、update、delete 等 |
ts | 操作时间戳 |
xid | 事务id |
commit | 同一个 xid 代表同一个事务,事务的最后一条语句会有 commit |
data | 最新的数据,修改后的数据 |
old | 旧数据,修改前的数据 |
更多推荐
已为社区贡献1条内容
所有评论(0)