Debezium:mysql connector使用
前言:mysql connector支持功能:1、数据流式处理2、支持多种模式mysql集群的数据处理3、支持流复制异常中断后的数据自动恢复4、支持DDL级别操作的数据流处理备注:本文以kafka单节点开发环境为例,多节点分布式连接器相似,参考:https://blog.csdn.net/u012551524/article/details/83349324插件版本:...
前言:
mysql connector支持功能:
1、数据流式处理
2、支持多种模式mysql集群的数据处理
3、支持流复制异常中断后的数据自动恢复
4、支持DDL级别操作的数据流处理
备注:本文以kafka单节点开发环境为例,多节点分布式连接器相似,参考:https://blog.csdn.net/u012551524/article/details/83349324
插件版本:
Kafka:CDK3.10 (相当于Kafka1.1版本),这里需要kafka 0.10以上版本才能支持
Debezium:0.83
Mysql:5.5 (mysql5.6版本前后会有一些差异,下面会提到)
环境安装参考:https://blog.csdn.net/u012551524/article/details/84258521
基本参数:
name=mysql-connector //连接器名字
slot.name=debezium //mysql流复制插槽名字
connector.class=io.debezium.connector.mysql.MySqlConnector //引用的连接器类
database.hostname=localhost //mysql服务器地址
database.port=3306 //端口号
database.user=debezium //安装配置时赋予权限的用户名
database.password=debezium //安装配置时赋予权限的密码
database.server.id=223344 //mysql安装配置时的服务ID
database.history.kafka.bootstrap.servers=BigData-Dev-5:9092 //kafka服务节点
database.server.name=debezium //数据库服务名
database.whitelist=debezium //接受连接器流复制的数据库白名单
database.history.kafka.topic=dbhistory.debezium //存放database history checkpoit
include.schema.changes=true //监控schema change
将配置写入新建配置文件:postgres2.properties
启动kafka连接器:
$KAFKA_CON/connect-standalone.sh /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-standalone.properties /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/postgres2.properties
测试:
在mysql插入数据,然后在kafka客户端消费有数据变动的表所对应的topic(topic形式:database.server.name+database.name+table.name)
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic debezium.debezium.test
关于DDL操作流监控:
kafka存放DDL变动的topic(名字:database.server.name)
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic debezium --from-beginning
遇到的问题:
1、WARN [AdminClient clientId=mysql-connector-dbhistory] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:241)
如果有遇到这个问题,可以查看Kafka服务域名IP是否又问题,查看方式:./zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/150" (150:为brokerID,这个根据自己的设置来),查看返回结果
WATCHER::
WatchedEvent state:SyncConnected type:None path:null {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://BigData-Dev-5:9092"],"jmx_port":9393,"host":"BigData-Dev-5","timestamp":"1542189462697","port":9092,"version":4}
cZxid = 0x13000028d9
ctime = Wed Nov 14 17:57:42 CST 2018
mZxid = 0x13000028d9
mtime = Wed Nov 14 17:57:42 CST 2018
pZxid = 0x13000028d9
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x56710c51c11018d
dataLength = 198
numChildren = 0
查看PLAINTEXT://BigData-Dev-5:9092与自己设置的database.history.kafka.bootstrap.servers是否一致
2、org.apache.kafka.connect.errors.ConnectException: The MySQL server is not configured to use a row-level binlog, which is required for this connector to work properly. Change the MySQL configuration to use a row-level binlog and restart the connector.
遇到这类问题直接审查一下自己的mysql配置是否生效
3、binlog_row_image 这个参数无效
官网给出要设置这个参数,这个参数是mysql5.6之后的新参数,如果5.6之前可以不用设置,效果一样
更多推荐
所有评论(0)