canal-adapter使用,MySQL到kafka到MySQL
一、背景介绍canal-adapter是canal的客户端适配器,能够直接将canal同步的数据写入到目标数据库(hbase,rdb,es),rdb是关系型数据库比如MySQL、Oracle、PostgresSQL和SQLServer等,比较的快捷方便。本文以MySQL-->canalServer-->kafka-->canalAdaper-->MySQL为例介绍can..
·
一、背景介绍
canal-adapter是canal的客户端适配器,能够直接将canal同步的数据写入到目标数据库(hbase,rdb,es),rdb是关系型数据库比如MySQL、Oracle、PostgresSQL和SQLServer等,比较的快捷方便。本文以MySQL-->canalServer-->kafka-->canalAdaper-->MySQL为例介绍canal-adapter的使用。
前提条件:
- canal.deployer-1.1.4已经安装,在canal1节点。安装教程
- MySQL数据库两个mysql1和mysql2
- kafka的版本2.11,在kafka1,kafka2,kafka3节点上
- zookeeper版本,在zk1节点上
二、下载配置
- 下载canal.adapter-1.1.4 解压到canal-adapter目录里面:tar -zxvf canal.adapter-1.1.4.tar.gz -C canal-adapter
- 配置文件: vim canal-adapter/conf/application.yml
server:
port: 8081
logging:
level:
com.alibaba.otter.canal.client.adapter.rdb: INFO #日志级别默认是DEBUG
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka # kafka rocketMQ tcp
canalServerHost: canal1:11111
zookeeperHosts: zk1:2181
mqServers: kafka1:9092,kafka2:9092,kafka3:9092 #or rocketmq
flatMessage: false #开启了数据压缩格式,protobuf
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://mysql1:3306/testOrg?useUnicode=true
username: rootOrg
password: rootOrg
canalAdapters:
- instance: topic1 # canal 实例的名字后者是kafka的topic名字
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://mysql2:3306/test?useUnicode=true
jdbc.username: root
jdbc.password: root2020
3.编辑 : vim canal-adapter/conf/rdb/mytest_user.yml
dataSourceKey: defaultDS #和application里面的srcDataSources参数一致
destination: topic1 #canal的实例名字或者是kafka的topic名字
groupId: g1 #和application里面的一致
outerAdapterKey: mysql1 #和application里面的一致
concurrent: true
dbMapping:
database: testOrg #源MySQL的数据库
table: orders #源MySQL的org的表orders
targetTable: test.orders #目标数据库和表
targetPk:
id: id #MySQL的主键
mapAll: true #全量映射
# targetColumns: #部分映射
# id:
# name:
# role_id:
# c_time:
# test1:
# etlCondition: "where c_time>={}" # 简单的etl处理
commitBatch: 100 # 批量提交的大小
三、启动验证
1、启动:sh bin/start.sh
2、到mysql2的test.orders上查询数据
3、查询状态:curl http://127.0.0.1:8081/destinations
四、高级用法
五、问题总结
有问题交流,请加个人主页群
更多推荐
所有评论(0)