配置mysql

启用MySQL Binlog

        MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启

修改MySQL配置文件/etc/my.cnf

sudo vim/etc/my.cof

增加如下配置

注:MySQL Binlog模式

Statement-based基于语句,Binlog会记录所有写操作的SQL语句,包括insert、update、delete等。

优点:节省空间

缺点:有可能造成数据不一致,例如insert语句中包含now()函数。

Row-based基于行,Binlog会记录每次写操作后被操作行记录的变化。

优点:保持数据的绝对一致性。

缺点:占用较大空间。

mixed混合模式,默认是Statement-based,如果SQL语句可能导致数据不一致,就自动切换到Row-based

Maxwell要求Binlog采用Row-based模式

重启MySQL服务

注意:配置完成后一定要重启mysql,不然监控不了

sudo systemctl restart mysqld

创建Maxwell所需数据库和用户

Maxwell需要在MySQL中存储其运行过程中的所需的一些数据,包括binlog同步的断点位置(Maxwell支持断点续传)等等,故需要在MySQL为Maxwell创建数据库及用户

创建数据库

CREATE DATABASE maxwell;

创建Maxwell用户并赋予其必要权限

CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';

GRANT ALL ON maxwell.* TO 'maxwell'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

创建完成后会在mysql中显示这个库

配置maxwell

修改Maxwell配置文件名称

cd /opt/module/maxwell

cp config.properties.example config.PROPERTIES

修改Maxwell配置文件

Maxwell的使用

启动Kafka集群

        若Maxwell发送数据的目的地为Kafka集群,则需要先确保Kafka集群为启动状态

Maxwell启停

启动Maxwell

opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon

停止Maxwell

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9

Maxwell启停脚本

赋予权限

chmod +x mxw.sh    

启动Maxwell    

mxw.sh start

停止Maxwell

mxw.sh stop

增量数据同步

启动zookeeper,kafka,maxwell。

启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db

只要没有配置错,你的mysql中有变更数据,maxwell会实时监控并写入到kafka的主题中

历史数据全量同步

        现在已经实现了使用Maxwell实时增量同步MySQL变更数据的功能。但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。

Maxwell-bootstrap

        Maxwell提供了bootstrap功能来进行历史数据的全量同步,命令如下

 /opt/module/maxwell/bin/maxwell-bootstrap --database 你的库名 --table 你的表名 --config /opt/module/maxwell/config.properties

bootstrap数据格式

采用bootstrap方式同步的输出数据格式如下:

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-start",

    "ts": 1450557744,

    "data": {}

}

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-insert",

    "ts": 1450557744,

    "data": {

        "txt": "hello"

    }

}

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-insert",

    "ts": 1450557744,

    "data": {

        "txt": "bootstrap!"

    }

}

{

    "database": "fooDB",

    "table": "barTable",

    "type": "bootstrap-complete",

    "ts": 1450557744,

    "data": {}

}

注意事项:

(1)第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的数据才包含数据。

(2)一次bootstrap输出的所有记录的ts都相同,为bootstrap开始的时间

全量同步脚本

在命令行使用全量同步有点麻烦,我们可以写一个脚本。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐