大数据相关工具

Maxwell数据实时同步工具

Maxwell 简介

  • Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的 DML 指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
  • 官网地址:http://maxwells-daemon.io
  • GitHub地址:https://github.com/zendesk/maxwell
  • Maxwell 主要功能:
    • 支持 select * from table 方式进行全量数据初始化;
    • 支持在主库发生 failover 后,自动恢复 binlog 位置,实现断点续传;
    • 可以对数据进行分区,解决数据倾斜问题,发送到 Kafka 的数据支持库、表、列等级别的数据分区;
    • 工作方式是伪装为 slave 接收 binlog events,然后根据 schema 信息拼装,可以接受 ddl、xid、row 等event。

Maxwell工作原理

  • Maxwell工作原理与Canal工作原理一样,都是把自己伪装成MySQL 的slave从库,同步binlog数据,来达到同步MySQL数据,与Canal相比,更加轻量。同样使用Maxwell也需要开启MySQL binlog日志。

在这里插入图片描述

MySQL Binlog 介绍

1. Binlog简介
  • MySQL 中有以下几种日志:
    • 错误日志:记录在启动,运行或停止mysqld时遇到的问题
    • 通用日志:记录建立的客户端连接和执行的语句
    • 二进制日志(bin log):记录更改数据的语句
    • 中继日志:从服务器 复制 主服务器接收的数据更改
    • 慢查询日志:记录所有执行时间超过 long_query_time 秒的所有查询或不使用索引的查询
    • DDL 日志(元数据日志):元数据操作由 DDL 语句执行
  • 在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件。
  • MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDLDML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复
    • MySQL主从复制:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给slaves 来达到 master-slave 数据一致的目的。
    • 数据恢复:通过使用 mysqlbinlog 工具来使恢复数据。
2. Binlog的日志格式
  • 记录在二进制日志中的事件的格式取决于二进制记录格式,支持三种格式类型:
    • Statement:基于 SQL 语句的复制(statement-based replication, SBR)
    • Row:基于行的复制(row-based replication, RBR)
    • Mixed:混合模式复制(mixed-based replication, MBR)
  • Statement
    • 每一条会修改数据的sql都会记录在binlog中。
    • 优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO,提高了性能。
    • 缺点:在进行数据同步的过程中有可能出现数据不一致,比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
  • Row
    • 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。
    • 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。
    • 缺点:每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。
  • Mixed
    • 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的操作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。
    • 优点:节省空间,同时兼顾了一定的一致性。
    • 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

Mysql 实时数据同步方案对比

  • mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍:
对比项CanalMaxwellmysql_streamer
开源方阿里巴巴zendeskYelp
语言JavaJavaPython
活跃度活跃活跃活跃
HA支持定制支持
数据落地定制Kafka等Kafka
分区支持支持不支持
bootstrap不支持支持支持
数据格式格式自由json(固定)json(固定)
文档较详细较详细粗略
随机读支持支持支持
  • 其中canal 由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费 canal 解析到的数据。
  • Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适
  • 另外Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。

开启MySQL的Binlog

  • mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
  • 添加mysql普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户。
-- 校验级别最低,只校验密码长度
mysql> set global validate_password_policy=LOW;
mysql> set global validate_password_length=6;

-- 创建maxwell库(启动时候会自动创建,不需手动创建)和用户
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

-- 刷新权限
mysql> flush privileges;
  • 修改配置文件 sudo vim /etc/my.cnf,添加或修改以下三行配置:
# binlog日志名称前缀
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# 唯一标识,这个值的区间是:1到(2^32)-1
server_id=1
  • 重启 MySQL 服务
sudo service mysqld restart
  • 验证 binlog 是否配置成功
show variables like '%log_bin%';
  • 查看binlog日志文件生成:进入 /var/lib/mysql 目录,查看binlog日志文件

在这里插入图片描述

Maxwell 安装部署

  • 安装包下载&解压:
wget https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
tar -zxvf maxwell-1.21.1.tar.gz -C /bigdata/install
  • 修改maxwell配置文件:
cd /bigdata/install/maxwell-1.21.1/
cp config.properties.example config.properties
vim config.properties
  • 配置文件config.properties 内容如下:
# choose where to produce data to
producer=kafka
# list of kafka brokers
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
# mysql login info
host=node03
port=3306
user=maxwell
password=123456
# kafka topic to write to
kafka_topic=maxwell

Maxwell 实时采集案例

zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties
  • 创建 topic
# /usr/apps/kafka
bin/kafka-topics.sh --create --bootstrap-server 192.168.254.128:9092 --replication-factor 2 --partitions 3 --topic maxwell
# 查看topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.254.128:9092

在这里插入图片描述

  • node01 启动 kafka 自带控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.254.128:9092 --topic maxwell --from-beginning
  • node03 启动maxwell服务
maxwell

在这里插入图片描述

  • 插入数据进行测试,向mysql表中插入一条数据,查看kafka是否能够接收到数据
CREATE DATABASE /*!32312 IF NOT EXISTS*/`test_db` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `test_db`;

/*Table structure for table `user` */

DROP TABLE IF EXISTS `user`;

CREATE TABLE `user` (
  `id` varchar(10) NOT NULL,
  `name` varchar(10) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `user` */
-- 插入数据
insert  into `user`(`id`,`name`,`age`) values  ('1','xiaokai',20);
-- 修改数据
update `user` set age= 30 where id='1';
-- 删除数据
delete from `user` where id='1';

在这里插入图片描述

  • 输出 json 数据字段说明:
字段说明
database数据库名称
table表名称
type操作类型,包括 insert、update、delete 等
ts操作时间戳
xid事务id
commit同一个 xid 代表同一个事务,事务的最后一条语句会有 commit
data最新的数据,修改后的数据
old旧数据,修改前的数据
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐