目录

(1)配置linux中connect内存分配:

(2)配置connect-standalone参数:

A、broker访问地址:

B、offset存储路径:

C、plugin插件路径:

(3)配置数据来源参数:

A、配置jdbc来源参数:

B、配置file来源参数:

C、官网插件查找:

(4)配置数据目的参数:

A、配置jdbc目的参数:

B、配置file目的参数:

C、官网插件查找:

(5)检查connect.class:

(6)检查jdbc等链接驱动:

(7)启动connect单机:

A、windows启动:

B、Linux启动:

(8)动态管理connect:

(9)测试connect:


 

(1)配置linux中connect内存分配:

修改kafka安装路径下的bin/connect-standalone.sh文件,可以修改分配给connect的内存:

(2)配置connect-standalone参数:

A、broker访问地址:

参数:bootstrap.servers,值为单个kafka访问地址。

B、offset存储路径:

参数:offset.storage.file.filename,值为offset文件存储路径。

C、plugin插件路径:

参数:plugin.path,值为connect.class的插件保存路径。

(3)配置数据来源参数:

A、配置jdbc来源参数:

参数:name,配置连接名称,唯一即可。

参数:connect.class,配置连接器类型,jdb应该为:io.confluent.connect.jdbc.JdbcSourceConnector。

参数:tasks.max,配置最大task数量。

参数:connection.url,配置jdbc连接地址。例如:

jdbc:mysql://localhost:3306/A?user=***&password=***

参数:table.whitelist,配置需要访问的表,多张表用英文逗号分隔。

参数:mode,配置模型,分为三种incrementing(自增)、timestamp(时间戳)、timestamp+incrementing(自增+时间戳)。

参数:incrementing.column.name,配置自增字段,如果没有自增就不需要。

参数:timestamp.column.name,配置时间戳字段,如果没有时间戳就不需要。

参数:topic.prefix,配置kafka中topic名称前缀,topic全部名称为:前缀+表名称。

B、配置file来源参数:

参数:name,配置连接名称,唯一即可。

参数:connect.class,配置连接器类型,file应该为:FileStreamSource。

参数:tasks.max,配置最大task数量。

参数:topic,配置kafka中topic名称。

参数:file,配置file存储文件路径。

C、官网插件查找:

可以百度:confluent,然后product---Confluent Hub,搜索对应需要的connect插件,然后查看document中对应文档即可。

(4)配置数据目的参数:

A、配置jdbc目的参数:

参数:name,配置连接名称,唯一即可。

参数:connect.class,配置连接器类型,jdb应该为:io.confluent.connect.jdbc.JdbcSinkConnector。

参数:topics,配置kafka中topic名称。

参数:tasks.max,配置最大task数量。

参数:connection.url,配置jdbc连接地址。

参数:auto.create,配置是否自动创建表,如果自动创建表,那么创建表的名称为topic名称。

参数:insert.mode,配置数据入表方式,更新或插入。值有upsert、insert。

参数:pk.mode = record_value,参数:pk.fields,两个同时存在,指定主键ID字段名称,用来指定kafka中存储数据指定了主键,方便数据导出时如果需要做增量同步数据。

参数:table.name.format,配置表表名称。

B、配置file目的参数:

参数:name,配置连接名称,唯一即可。

参数:connect.class,配置连接器类型,file应该为:FileStreamSink。

参数:tasks.max,配置最大task数量。

参数:topic,配置kafka中topic名称。

参数:file,配置file存储文件路径。

C、官网插件查找:

可以百度:confluent,然后product---Confluent Hub,搜索对应需要的connect插件,然后查看document中对应文档即可。

(5)检查connect.class:

检查下connect-standalone.properties配置文件中配置的plugin.path中是否有source、sink中配置的对应connect.class的插件。如果没有就按照connect支持链接中在网上进行下载对应的插件,然后解压到plugin.path指定的文件夹中即可。

(6)检查jdbc等链接驱动:

如果是jdbc等链接方式的,需要提前检查kafka安装路径下的libs中是否有对应的访问驱动jar,如果没有需要手动上传一个对应的驱动jar包。

(7)启动connect单机:

A、windows启动:

connect-standalone.bat ../config/connect-standalone.properties ../config/connect-source.properties ../config/connect-sink.properties

说明:

connect-standalone.properties:为单机connect配置文件。

connect-source.properties:为自定义的数据来源配置文件,文件名可变的是。

connect-sink.properties:为自定义的数据目的配置文件,文件名可变的是。

B、Linux启动:

./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-source.properties ../config/connect-sink.properties

说明:

connect-standalone.properties:为单机connect配置文件。

connect-source.properties:为自定义的数据来源配置文件,文件名可变的是。

connect-sink.properties:为自定义的数据目的配置文件,文件名可变的是。

(8)动态管理connect:

在connect单机模式下,也支持通过REST API来动态管理connect,但是通过REST API动态管理的connect配置信息只是临时保存,一旦connect的进程关闭再重启后刚才动态管理配置的信息将不存在。

(9)测试connect:

在数据来源的地方动态的添加数据,在kafka对应的topic中,以及对应的数据目的的地方可以看到新添加的数据。这样的结果就表示通过kafka-connect可以实时的同步数据了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐