本文主要介绍如何配置和使用Greenplum-Kafka集成

1.介绍

Greenplum-Kafka集成使用Greenplum Stream Server来提供从Apache Kafka到Greenplum数据库的高速并行数据加载入库。

Greenplum-Kafka集成包括gpkafka工具,gpkafka工具是kafka的comsumer。gpkafka支持两个命令:

  • gpkafka load:加载kafka数据到greenplum集群中的表中;
  • gpkafka check:检查加载操作的提交历史记录;

gpkafka加载数据过程:首先接收来自单个Kafka主题的流数据,并将数据先插入到外部表中,再使用Greenplum数据库可读外部表将数据转换并插入到目标Greenplum表中。

局限性

Greenplum-Kafka集成目前支持:从单个Kafka主题加载到单个Greenplum数据库表。在启动加载任务前必须预先创建好Greenplum表。

2.加载Kafka数据到Greenplum

2.1 配置gpkafka.yaml

DATABASE: ops
USER: gpadmin
PASSWORD: changeme
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
         BROKERS: kbrokerhost1:9092
         TOPIC: customer_expenses2
      VALUE:
         COLUMNS:
           - NAME: c1
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      KEY:
         COLUMNS:
           - NAME: key
             TYPE: json
         FORMAT: avro
         AVRO_OPTION:
           SCHEMA_REGISTRY_ADDR: http://localhost:8081
      ERROR_LIMIT: 25
   OUTPUT:
      SCHEMA: payables
      TABLE: expenses2
      MAPPING:
        - NAME: customer_id
          EXPRESSION: (c1->>'cust_id')::int
        - NAME: newcust
          EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
        - NAME: expenses
          EXPRESSION: (c1->>'expenses')::decimal
        - NAME: tax_due
          EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
   METADATA:
      SCHEMA: gpkafka_internal
   COMMIT:
      MAX_ROW: 1000
      MINIMAL_INTERVAL: 30000
   POLL:
      BATCHSIZE: 100
      TIMEOUT: 3000

 

2.2运行命令加载数据

gpkafka load loadcfg.yaml

2.3可以使用gpkafka check 检查加载进度

gpkafka check loadcfg.yaml

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐