Debezium:kafka 连接器配置
目的:构建基于hbase的实时数仓解决的问题:RDBMS到Hbase的数据实时采集方法:Postgresql----->Debezium----->Kafka------>Sparkstreaming------>Phoenix本文:本文主要是kafka连接器一些相关配置官网参考地址...
目的:构建基于hbase的实时数仓
解决的问题:RDBMS到Hbase的数据实时采集
方法:Postgresql -----> Debezium -----> Kafka ------> Sparkstreaming ------> Phoenix
本文:本文主要是kafka连接器一些相关配置
官网参考地址:
Debezium:https://debezium.io/docs/connectors/postgresql/
Kafka:Apache Kafka
kafka提供两种连接器:(例子为通过debezium连接postgresql)
一、connect-standalone.sh:用于测试开发
- 命令:./connect-standalone.sh /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-standalone.properties /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/postgres.properties
- standalone模式需要配置两个参数:
第一个是系统配置文件connect-standalone.properties,修改:bootstrap.servers=10.0.10.94:9092 (默认为localhost)
第二个是连接配置文件postgres.properties,配置connector:
name=test-connector
slot.name=debezium
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=10.0.10.76
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=test
database.history.kafka.bootstrap.servers=localhost:9092
database.server.name=test
table.whitelist=public.test2
plugin.name=wal2json
二、connect-distributed.sh:用于正式环境线上
- 命令:./connect-distributed.sh /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/connect-distributed.properties如上,distributed模式不提供连接配置,connector需要后续以API形式倒入
- 配置文件connect-distributed.properties需要同样修改:bootstrap.servers=10.0.10.94:9092
- distributed模式需要创建topic来存放一些连接信息
config.storage.topic=connect-configs 保存connector配置信息
./kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1
offset.storage.topic=connect-offsets 保存offset信息
./kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50
status.storage.topic=connect-status 保存connector状态信息
./kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10
- distributed模式常用API:
GET /connectors – return a list of active connectors 返回活跃的connectors
POST /connectors – create a new connector; the request body should be a JSON object containing a string
name
field and an objectconfig
field with the connector configuration parameters 创建一个新的connectorGET /connectors/{name} – get information about a specific connector 获取指定connetor的信息
GET /connectors/{name}/config – get the configuration parameters for a specific connector 获取指定connector的配置信息
PUT /connectors/{name}/config – update the configuration parameters for a specific connector 更新指定connector的配置信息
GET /connectors/{name}/status – get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks 获取指定connector状态
GET /connectors/{name}/tasks – get a list of tasks currently running for a connector 获取指定connector正在运行的task
GET /connectors/{name}/tasks/{taskid}/status – get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed 获取指定connector的task状态信息
PUT /connectors/{name}/pause – pause the connector and its tasks, which stops message processing until the connector is resumed 暂停connector和它的task
PUT /connectors/{name}/resume – resume a paused connector (or do nothing if the connector is not paused) 恢复一个被暂停的connector
POST /connectors/{name}/restart – restart a connector (typically because it has failed) 重启一个connector
POST /connectors/{name}/tasks/{taskId}/restart – restart an individual task (typically because it has failed) 重启一个task
DELETE /connectors/{name} – delete a connector, halting all tasks and deleting its configuration 删除一个connector
-
例子
新建一个connector:curl 'http://localhost:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "10.0.10.76",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.server.name": "test",
"table.whitelist": "public.test2",
"plugin.name":"wal2json"
}
}'
获取connector:
curl -X GET 'http://localhost:8083/connectors'
重启connector:
curl -X POST 'http://localhost:8083/connectors/test-connector/restart'
更新connector配置:
curl -X PUT 'http://localhost:8083/connectors/test-connector/config' -i -H "Content-Type:application/json" -d '
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "10.0.10.76",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.server.name": "test",
"table.whitelist": "public.test2,public.nt_sale_order_test",
"decimal.handling.mode": "string",
"plugin.name":"wal2json"
}'
更多推荐
所有评论(0)