kafka—confluent connector使用
一、kafka-connector-jdbc1、共性设置#connector名称name=test-mysql-rsyncconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1#数据库连接、用户、密码connection.url=jdbc:mysql://127.0.0.1:3306/rsyn...
·
0、命令
./bin/confluent stop
./bin/confluent start
./bin/confluent load source-ssouser -d /data/app/confluent/etc/kafka-connect-jdbc/mysql-source-ssouser.properties
./bin/confluent load source-ssouser-jg -d /data/app/confluent/etc/kafka-connect-jdbc/mysql-source-ssouser-jg.properties
./bin/confluent load source-applog -d /data/app/confluent/etc/kafka-connect-jdbc/mysql-source-applog.properties
一、kafka-connector-jdbc
1、共性设置
#connector名称
name=test-mysql-rsync
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
#数据库连接、用户、密码
connection.url=jdbc:mysql://127.0.0.1:3306/rsync?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&failOverReadOnly=false
connection.user=
connection.password=
dialect.name=MySqlDatabaseDialect
#白名单,只同步白名单中的表
table.whitelist=库名.表名
#数据更新模式,按自增与时间戳,设定对应的字段。
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=update_time
#topic前缀
topic.prefix=test-mysql-jdbc-
validate.non.null=false
#支持插入与更新
insert.mode=upsert
pk.mode = record_value
#主键字段
pk.fields = id
delete.enabled=true
#key value 序列化
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
transforms=createKey
transforms.createKey.fields=userId,updateTime
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
2、模式A—按表结构同步
#必须设定白名单
table.whitelist=库名.表名
#这里设定的是最终topic的前缀
topic.prefix=test-mysql-jdbc-
3、模式B—按Query sql语句同步
#query模式下,不可以设定白名单
~~#table.whitelist=user~~
#query语句
query=SELECT userid,nickname,phone,createTime,effectivetime,expiretime FROM user
#topic名称,query模式下,topic是完整名称
topic.prefix=test-jdbc-user
4、启用
#装载connector
./bin/confluent load test-mysql-rsync -d %confluent_path%/etc/kafka-connect-jdbc/source-mysql-rsync.properties
#卸载connector
./bin/confluent unload test-mysql-rsync -d %confluent_path%/etc/kafka-connect-jdbc/source-mysql-rsync.properties
二、kafka-connector-elasticsearch
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-mysql-jdbc-user,applog
key.ignore=false
value.ignore=true
schema.ignore=true
connection.url=http://localhost:8607
#topics.key.ignore=id
type.name=kafka-connect
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
三、常用命令
1、查询topic offset
./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xboss_event --time -1
# --time [-1:获取最新offset, -2:获取最旧offset]
2、
更多推荐
已为社区贡献1条内容
所有评论(0)