Greenplum Kafka 集成
本文主要介绍如何配置和使用Greenplum-Kafka集成1.介绍Greenplum-Kafka集成使用Greenplum Stream Server来提供从Apache Kafka到Greenplum数据库的高速并行数据加载入库。Greenplum-Kafka集成包括gpkafka工具,gpkafka工具是kafka的comsumer。gpkafka支持两个命令:gpkafka...
·
本文主要介绍如何配置和使用Greenplum-Kafka集成
1.介绍
Greenplum-Kafka集成使用Greenplum Stream Server来提供从Apache Kafka到Greenplum数据库的高速并行数据加载入库。
Greenplum-Kafka集成包括gpkafka工具,gpkafka工具是kafka的comsumer。gpkafka支持两个命令:
- gpkafka load:加载kafka数据到greenplum集群中的表中;
- gpkafka check:检查加载操作的提交历史记录;
gpkafka加载数据过程:首先接收来自单个Kafka主题的流数据,并将数据先插入到外部表中,再使用Greenplum数据库可读外部表将数据转换并插入到目标Greenplum表中。
局限性
Greenplum-Kafka集成目前支持:从单个Kafka主题加载到单个Greenplum数据库表。在启动加载任务前必须预先创建好Greenplum表。
2.加载Kafka数据到Greenplum
2.1 配置gpkafka.yaml
DATABASE: ops
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kbrokerhost1:9092
TOPIC: customer_expenses2
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
KEY:
COLUMNS:
- NAME: key
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: payables
TABLE: expenses2
MAPPING:
- NAME: customer_id
EXPRESSION: (c1->>'cust_id')::int
- NAME: newcust
EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
- NAME: expenses
EXPRESSION: (c1->>'expenses')::decimal
- NAME: tax_due
EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
METADATA:
SCHEMA: gpkafka_internal
COMMIT:
MAX_ROW: 1000
MINIMAL_INTERVAL: 30000
POLL:
BATCHSIZE: 100
TIMEOUT: 3000
2.2运行命令加载数据
gpkafka load loadcfg.yaml
2.3可以使用gpkafka check 检查加载进度
gpkafka check loadcfg.yaml
更多推荐
已为社区贡献3条内容
所有评论(0)