Kafka源表的实现来源于自社区的kafka版本实现。

注意:本文档只适合独享模式下使用。

Kafka需要定义的DDL如下。

 
   
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka010',
  9. topic = 'xxx',
  10. `group.id` = 'xxx',
  11. bootstrap.servers = 'ip:端口,ip:端口,ip:端口'
  12. );

注意:以上表中的五个字段顺序务必保持一致。

WITH参数

通用配置

参数注释说明备注
typeKafka对应版本推荐使用KAFKA010
topic读取的单个topictopic名称

必选配置

(1)kafka08必选配置:

参数注释说明备注
group.id消费组id
zookeeper.connectzk链接地址zk连接id

(2)kafka09/kafka010/kafka011必选配置:

参数注释说明备注
group.id消费组id
bootstrap.serverskafka集群地址kafka集群地址

Kafka集群地址:

如果您的kafka是阿里云商业版,请参考kafka商业版准备配置文档。

如果您的kafka是阿里云公测版,请参考kafka公测版准备配置文档。

可选配置

 
    
  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

注意:其它可选配置项参考kafka官方文档:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs
Kafka010
https://kafka.apache.org/090/documentation.html#newconsumerconfigs
Kafka011
https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

kafka版本对应关系

TypeKafka 版本
Kafka080.8.22
Kafka090.9.0.1
Kafka0100.10.2.1
Kafka0110.11.0.2

Kafka消息解析

默认Kafka读到的消息:

 
    
  1. messageKey varbianry,
  2. message varbianry,
  3. topic varchar,
  4. partition int,
  5. offset bigint

这样一个五元组,如果您希望在source阶段把数据parser成特定的其它格式,可以按照下面实践进行。

参数注释说明备注
parserUdtf自定义解析函数用于解析从kafka读到的消息映射到ddl具体对应的类型

如何写一个parserUdtf参见自定义表值函数(UDTF)

自建kafka

与阿里云Kafka消息队列一样,DDL定义相同。

示例:

 
    
  1. create table kafka_stream(
  2. messageKey VARBINARY,
  3. `message` VARBINARY,
  4. topic varchar,
  5. `partition` int,
  6. `offset` bigint
  7. ) with (
  8. type ='kafka011',
  9. topic = 'kafka_01',
  10. `group.id` = 'CID_blink',
  11. bootstrap.servers = '192.168.0.251:9092'
  12. );

WITH参数

关于自建Kafka的with参数,请参考本文档Kafka创建时DDL的with参数说明。需要注意的是 bootstrap.servers参数需要填写自建的地址和端口号。

注意:无论是阿里云Kafka还是自建Kafka,目前实时计算均无Tps、Rps等指标信息。在作业上线之后,运维界面暂时不支持显示指标信息。

本文转自实时计算——创建消息队列(Kafka)源表

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐