kafka-connect——数据导入导出——搭建单机模式
目录(1)配置linux中connect内存分配:(2)配置connect-standalone参数:A、broker访问地址:B、offset存储路径:C、plugin插件路径:(3)配置数据来源参数:A、配置jdbc来源参数:B、配置file来源参数:C、官网插件查找:(4)配置数据目的参数:A、配置jdbc目的参数:B、配置file目的参数:...
目录
(1)配置linux中connect内存分配:
修改kafka安装路径下的bin/connect-standalone.sh文件,可以修改分配给connect的内存:
(2)配置connect-standalone参数:
A、broker访问地址:
参数:bootstrap.servers,值为单个kafka访问地址。
B、offset存储路径:
参数:offset.storage.file.filename,值为offset文件存储路径。
C、plugin插件路径:
参数:plugin.path,值为connect.class的插件保存路径。
(3)配置数据来源参数:
A、配置jdbc来源参数:
参数:name,配置连接名称,唯一即可。
参数:connect.class,配置连接器类型,jdb应该为:io.confluent.connect.jdbc.JdbcSourceConnector。
参数:tasks.max,配置最大task数量。
参数:connection.url,配置jdbc连接地址。例如:
jdbc:mysql://localhost:3306/A?user=***&password=***
参数:table.whitelist,配置需要访问的表,多张表用英文逗号分隔。
参数:mode,配置模型,分为三种incrementing(自增)、timestamp(时间戳)、timestamp+incrementing(自增+时间戳)。
参数:incrementing.column.name,配置自增字段,如果没有自增就不需要。
参数:timestamp.column.name,配置时间戳字段,如果没有时间戳就不需要。
参数:topic.prefix,配置kafka中topic名称前缀,topic全部名称为:前缀+表名称。
B、配置file来源参数:
参数:name,配置连接名称,唯一即可。
参数:connect.class,配置连接器类型,file应该为:FileStreamSource。
参数:tasks.max,配置最大task数量。
参数:topic,配置kafka中topic名称。
参数:file,配置file存储文件路径。
C、官网插件查找:
可以百度:confluent,然后product---Confluent Hub,搜索对应需要的connect插件,然后查看document中对应文档即可。
(4)配置数据目的参数:
A、配置jdbc目的参数:
参数:name,配置连接名称,唯一即可。
参数:connect.class,配置连接器类型,jdb应该为:io.confluent.connect.jdbc.JdbcSinkConnector。
参数:topics,配置kafka中topic名称。
参数:tasks.max,配置最大task数量。
参数:connection.url,配置jdbc连接地址。
参数:auto.create,配置是否自动创建表,如果自动创建表,那么创建表的名称为topic名称。
参数:insert.mode,配置数据入表方式,更新或插入。值有upsert、insert。
参数:pk.mode = record_value,参数:pk.fields,两个同时存在,指定主键ID字段名称,用来指定kafka中存储数据指定了主键,方便数据导出时如果需要做增量同步数据。
参数:table.name.format,配置表表名称。
B、配置file目的参数:
参数:name,配置连接名称,唯一即可。
参数:connect.class,配置连接器类型,file应该为:FileStreamSink。
参数:tasks.max,配置最大task数量。
参数:topic,配置kafka中topic名称。
参数:file,配置file存储文件路径。
C、官网插件查找:
可以百度:confluent,然后product---Confluent Hub,搜索对应需要的connect插件,然后查看document中对应文档即可。
(5)检查connect.class:
检查下connect-standalone.properties配置文件中配置的plugin.path中是否有source、sink中配置的对应connect.class的插件。如果没有就按照connect支持链接中在网上进行下载对应的插件,然后解压到plugin.path指定的文件夹中即可。
(6)检查jdbc等链接驱动:
如果是jdbc等链接方式的,需要提前检查kafka安装路径下的libs中是否有对应的访问驱动jar,如果没有需要手动上传一个对应的驱动jar包。
(7)启动connect单机:
A、windows启动:
connect-standalone.bat ../config/connect-standalone.properties ../config/connect-source.properties ../config/connect-sink.properties
说明:
connect-standalone.properties:为单机connect配置文件。
connect-source.properties:为自定义的数据来源配置文件,文件名可变的是。
connect-sink.properties:为自定义的数据目的配置文件,文件名可变的是。
B、Linux启动:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-source.properties ../config/connect-sink.properties
说明:
connect-standalone.properties:为单机connect配置文件。
connect-source.properties:为自定义的数据来源配置文件,文件名可变的是。
connect-sink.properties:为自定义的数据目的配置文件,文件名可变的是。
(8)动态管理connect:
在connect单机模式下,也支持通过REST API来动态管理connect,但是通过REST API动态管理的connect配置信息只是临时保存,一旦connect的进程关闭再重启后刚才动态管理配置的信息将不存在。
(9)测试connect:
在数据来源的地方动态的添加数据,在kafka对应的topic中,以及对应的数据目的的地方可以看到新添加的数据。这样的结果就表示通过kafka-connect可以实时的同步数据了。
更多推荐
所有评论(0)