1、kettle的kafka生产者叫kafka producer,nifi中的相应处理器为PublishKafka,如下图所示:

可以很清楚的看到PublishKafka处理器支持多个版本的kafka,选择时要根据自己的kafka 版本选择相匹配的PublishKafka处理器,由于本人使用的是kafka2.x,所以这里选择PublishKafka_2_0处理器。

处理器xxxRecord和xxx的区别,如PublishKafka和PublishKafkaRecord:

处理器xxx:将整个 FlowFile 中的内容作为一个消息进行发送,

xxxxxxRecord:将 FlowFile 中的内容拆分成多个记录(或行),然后将每个记录作为一个单独的消息进行发送。

2、flow demo 设计,如下图所示:

GenerateFlowFile:产生自定义数据“我是java小金刚”

PublishKafka:将“我是java小金刚”写入kafka。

LogMessage:打印消息。

 3、处理器PublishKafka属性配置,如下图所示:

 Kafka Brokers:以 host:port 格式表示的 Kafka Broker,集群列表通过逗号,如192.168.101.5:9092,192.168.101.6:9092

Security Protocol: 用于与代理通信的安全协议。对应于 Kafka 客户端的 security.protocol 属性。这里我们使用默认值PLAINTEXT

SASL Mechanism:用于认证的 SASL 机制。对应于 Kafka 客户端的 sasl.mechanism 属性。这里我们使用默认值GSSAPI

Kerberos Service Name:与broker JAAS 配置中配置的 Kafka 服务器的主要名称匹配的服务名称。

Kerberos Credentials Service:支持使用 Kerberos 进行通用凭证认证的服务。

Kerberos Principal:用于与 Kerberos 进行身份验证的主体。

Kerberos Keytab:用于与 Kerberos 进行身份验证的 Keytab 凭据,这个属性需要提供一个文件。

SSL Context Service:支持与 Kafka brokers 进行 SSL 通信的服务。

Topic Name:要发布到的 Kafka 主题的名称。

Delivery Guarantee:指定确保消息发送到 Kafka 的要求。对应于 Kafka 的 'acks' 属性。根据具体情况选择,这里使用默认值。

Failure Strategy:如果处理器无法将数据发布到 Kafka,指定处理流文件的方式。有Route to Failure 、Rollback 两个选项。

Use Transactions:指定在与 Kafka 通信时,NiFi 是否应提供事务性保证。

 Transactional Id Prefix:当 "Use Transaction" 设置为 true 时,KafkaProducer 配置 'transactional.id' 将生成一个 UUID,并以此字符串为前缀。

Attributes to Send as Headers (Regex):匹配所有 FlowFile 属性名称的正则表达式。任何名称与 regex 匹配的属性将作为标头添加到 Kafka 消息中。如果未指定,则不会将任何 FlowFile 属性添加为标头。

Message Header Encoding:对于作为消息标头添加的任何属性,通过 <Attributes to Send as Headers> 属性配置,此属性指示用于序列化标头的字符编码。

Kafka Key:消息的密钥。如果未指定,则如果存在流文件属性 'kafka.key',则将其用作消息密钥。请注意,同时设置 Kafka 密钥和分隔符可能会导致具有相同密钥的许多 Kafka 消息。通常情况下,这不是问题,因为 Kafka 不强制执行或假设消息和密钥的唯一性。但是,同时设置分隔符和 Kafka 密钥可能会导致 Kafka 上的数据丢失风险。在 Kafka 上进行主题压缩期间,将根据此密钥对消息进行去重。

Key Attribute Encoding:发出的FlowFiles具有一个名为'kafka.key'的属性。此属性规定了该属性值应如何编码。这是使用默认值。

Message Demarcator:指定用于在单个FlowFile中分隔多个消息的字符串(解释为UTF-8)。如果未指定,则将整个FlowFile内容用作单个消息。如果指定了分隔符,则将FlowFile内容拆分为此分隔符,并将每个部分作为单独的Kafka消息发送。要输入特殊字符,如'换行符',请使用CTRL+Enter或Shift+Enter,取决于您的操作系统。

Max Request Size:请求的最大大小,以字节为单位。对应于Kafka的'max.request.size'属性,默认为1 MB(1048576字节)。

Acknowledgment Wait Time:发送消息到Kafka后,这指示我们愿意等待从Kafka收到响应的时间量。如果Kafka在此时间段内未确认消息,FlowFile将被路由到'failure'。

Max Metadata Wait Time:发布者在执行“发送”调用期间等待获取元数据或等待缓冲区在“发送”调用之前刷新的时间量,超过此时间将导致整个“发送”调用失败。对应于Kafka的'max.block.ms'属性。

Partitioner class:指定用于计算消息分区ID的类。对应于Kafka的'partitioner.class'属性。

Partition:指定哪些分区记录将会发送消息。

Compression Type:这个参数允许你为此生产者生成的所有数据指定压缩编解码器。

动态属性:如kafka用户名和密码:Username、Password。

4、运行nifi flow,通过kafka-ui 查看kafka中的数据,如下图所示:

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐