应用debezium将postgresql数据送至kafka(官网示例 本地docker部署)
debezium官网示例,将postgresql变化数据送至kafka,docker部署
·
版本
conncet 2.2
postgresql 15.2
1 postgresql
1.1 获取
docker pull debezium/example-postgres
1.2 运行
docker run -d --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=postgres debezium/example-postgres
1.3 特殊配置
观察镜像中/var/lib/postgresql/data下postgresql.conf,最关键的配置为:
shared_preload_libraries = 'decoderbufs'
wal_level = logical
示例中使用解码器为decoderbufs,对应创建connect时plugin.name
默认值即为decoderbufs。postgresql 10版本后自带pgoutput,创建connect时可以将plugin.name
设置为pgoutput。
2 zookeeper
2.1 获取
docker pull debezium/zookeeper
2.2 运行
docker run -it -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
3 kafka
3.1 获取
docker pull debezium/kafka
3.2 运行
docker run -it -d --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
3.3 检查
如果本地已有kafka客户端,可以使用如下命令查看已有topic:
bin/kafka-topics.sh --bootstrap-server 172.17.0.1:9092 --list
connect
4.1 获取
docker pull debezium/connect
4.2 启动
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect
4.3 创建connect
创建pgsql-inventory-connector.json文件,通过database的hostname、port确定postgresql。此处使用postgresql账号,因为这个账号具有replication权限。
{
"name": "localhost-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "172.17.0.1",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "dbserver1",
"table.include.list": "inventory.customers"
}
}
通过http创建connect
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 172.17.0.1:8083/connectors/ -d @pgsql-inventory-connector.json
4.4 测试命令
curl -H "Accept:application/json" 172.17.0.1:8083
curl -H "Accept:application/json" 172.17.0.1:8083/connectors
curl -i -X GET -H "Accept:application/json" 172.17.0.1:8083/connectors/localhost-connector
5 测试
5.1 kafka
创建消费者
bin/kafka-console-consumer.sh --topic dbserver1.inventory.customers --from-beginning --bootstrap-server 172.17.0.1:9092
5.2 postgresql
insert into inventory.customers values (1005,'aA','bB','aAbB@home.com');
5.3 kafka显示结果
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"aA","last_name":"bB","email":"aAbB@home.com"},"source":{"version":"2.2.0.Alpha3","connector":"postgresql","name":"dbserver1","ts_ms":1687946054175,"snapshot":"false","db":"postgres","sequence":"[\"34244288\",\"34244576\"]","schema":"inventory","table":"customers","txId":758,"lsn":34244576,"xmin":null},"op":"c","ts_ms":1687946054536,"transaction":null}}
附录-图形界面
debezium-ui
docker run -d --name debezium-ui -p 8080:8080 -e KAFKA_CONNECT_URIS=http://172.17.0.1:8083 debezium/debezium-ui:latest
kakfa-ui
待研究
更多推荐
已为社区贡献2条内容
所有评论(0)