在很多场景下,需要将数据同步其他数据源进行计算,本文介绍通过debezium和kafka-connect将postgres数据同步到kafka中。

首先下载debezium,官网地址: https://debezium.io/
目前稳定版本是1.9.5,这是postgres对应的kafka-connect插件下载地址:

debezium-connector-postgres-1.9.5.Final-plugin.tar.gz

然后再kafka的目录下,新建一个plugins目录,将下载的包解压到这个目录:
在这里插入图片描述
然后我们配置kafka-coinect,我们这里以集群模式为例:

 vim config/connect-distributed.properties
 

主要调整如下几个地方:

# kafka集群地址
bootstrap.servers=node1:9092,node2:9092,node3:9092
# kafka-connect插件位置
plugin.path=/data1/service/kafka-2.7.2/plugins
#可以适当调整下面这个值,默认是 10000
offset.flush.timeout.ms=100000

然后我们就可以启动kafka-connect:

bin/connect-distributed.sh -daemon config/connect-distributed.properties

到这里,kafka-connect,就启动了起来。
kafka-connect提供了http restfule的接口供我们取操作,默认的端口地址是8083,常见如下:

GET /connector-plugins获取当前所有插件名称
GET /connectors获取当前所有connector
POST /connector添加一个connector
GET /connectors/{name}获取指定的connector的信息
GET /connectors/{name}/config获取指定的connector的配置信息
PUT /connectors/{name}/config更新connector的配置
GET /connectors/{name}/status获取指定connector的装填
GET /connectors/{name}/tasks/获取指定connector正在运行的task
GET /connectors/{name}/tasks/tasks/{taskid}/status获取connector的task状态信息
PUT /connectors/{name}/pause暂停connector和他运行task
POST /connectors/{name}/restart重启connector
POST /connectors/{name}/tasks/{taskid}/restart重启一个connector的task
DELETE /connectors/.{name}删除一个connector,停止关联的task并删除配置

在psotgres数据库侧,我们需要调整一下参数:

wal_level = logical
max_wal_senders = 2000
max_replication_slots = 2000
#下面两个参数可以根据需要调整
wal_sender_timeout = 60s
wal_receiver_timeout = 60s

配置完之后需要重启。
另外一点,如果PG版本比较老的话,需要装

  • decoderbufs(由Debezium社区维护,基于ProtoBuf)
  • wal2json(由wal2json社区维护,基于JSON)

而在PG10+默认自带pgoutput可以不用安装,我这里的是基于PG12,所以不用安装。

到这里所有准备工作就做好了,接下来就是想kafka-connect中添加connector了:

{
    "name": "prod-material-642",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "ad_dissectorprofile_000642",
        "database.user": "postgres",
        "slot.name": "prodmaterial642",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.password": "postgres",
        "name": "prod-material-642",
        "database.server.name": "prod-material-642",
        "database.port": "5432",
        "plugin.name": "pgoutput",
        "table.whitelist": "public.ad_entity,public.campaign,public.media_entity,public.url_scheme,public.schedule_entity"
    }
}

这里添加完之后,我们可以查看天剑的connector和其对应的任务,然后再kafka中,会生成如下几个topic:

prod-material-642.public.ad_entity
prod-material-642.public.campaign
prod-material-642.public.media_entity
prod-material-642.public.schedule_entity
prod-material-642.public.url_scheme

而写入到topic中的数据内容大致如下:

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
    },
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null
}

如果需要详细内容,可以看下debezium官网上对于postgres同步的详细介绍:

Debezium connector for PostgreSQL

后续我们就可以消费对应的topic来进行相关的数据同步处理即可

另外,需要注意的是,默认情况下,这里debezium是针对每个库、每个schema、每个表都生成一个topic,如果表比较多,那么topic数量将会特别多,为此debezium可以对写入的topic进行重新路由:

{
    "name": "prod-material-642",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "ad_dissectorprofile_000642",
        "database.user": "postgres",
        "slot.name": "prodmaterial642",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.password": "postgres",
        "name": "prod-material-642",
        "database.server.name": "prod-material-642",
        "database.port": "5432",
        "plugin.name": "pgoutput",
        "table.whitelist": "public.ad_entity,public.campaign,public.media_entity,public.url_scheme,public.schedule_entity",
         "transforms": "Combine",
        "transforms.Combine.type": "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Combine.topic.regex": "(.*)",
        "transforms.Combine.topic.replacement": "prod-material-sync"
    }
}

如上,这样会将所有的同步数据都写入到prod-material-sync这一个topic中去。详细可以参考官网:
topic-routing

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐