0、命令

./bin/confluent stop
./bin/confluent start

./bin/confluent load source-ssouser -d /data/app/confluent/etc/kafka-connect-jdbc/mysql-source-ssouser.properties 
./bin/confluent load source-ssouser-jg -d /data/app/confluent/etc/kafka-connect-jdbc/mysql-source-ssouser-jg.properties 
./bin/confluent load source-applog -d /data/app/confluent/etc/kafka-connect-jdbc/mysql-source-applog.properties 

一、kafka-connector-jdbc
1、共性设置

#connector名称
name=test-mysql-rsync
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1

#数据库连接、用户、密码
connection.url=jdbc:mysql://127.0.0.1:3306/rsync?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&failOverReadOnly=false
connection.user=
connection.password=
dialect.name=MySqlDatabaseDialect
#白名单,只同步白名单中的表
table.whitelist=库名.表名
#数据更新模式,按自增与时间戳,设定对应的字段。
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=update_time
#topic前缀
topic.prefix=test-mysql-jdbc-

validate.non.null=false
#支持插入与更新
insert.mode=upsert
pk.mode = record_value
#主键字段
pk.fields = id
delete.enabled=true
#key value 序列化

key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
transforms=createKey
transforms.createKey.fields=userId,updateTime
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey

2、模式A—按表结构同步

#必须设定白名单
table.whitelist=库名.表名
#这里设定的是最终topic的前缀
topic.prefix=test-mysql-jdbc-

3、模式B—按Query sql语句同步

#query模式下,不可以设定白名单
~~#table.whitelist=user~~ 
#query语句
query=SELECT userid,nickname,phone,createTime,effectivetime,expiretime FROM user

#topic名称,query模式下,topic是完整名称
topic.prefix=test-jdbc-user

4、启用

#装载connector
./bin/confluent load test-mysql-rsync -d %confluent_path%/etc/kafka-connect-jdbc/source-mysql-rsync.properties

#卸载connector
./bin/confluent unload test-mysql-rsync -d %confluent_path%/etc/kafka-connect-jdbc/source-mysql-rsync.properties

二、kafka-connector-elasticsearch


name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-mysql-jdbc-user,applog
key.ignore=false
value.ignore=true
schema.ignore=true
connection.url=http://localhost:8607
#topics.key.ignore=id
type.name=kafka-connect
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id    
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id

三、常用命令

1、查询topic offset

./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xboss_event --time -1
#  --time [-1:获取最新offset, -2:获取最旧offset]

2、

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐