ntroduction to librdkafka - the Apache Kafka C/C++ client library

librdkafka 是一个C实现的高性能 Apache Kafka 客户端,为生产环境提供了一个可靠和高性能的客户端。
librdkafka 同样也提供了传统的 C++ 接口。

目录

以下目录适用于本文

  • 性能
    • 性能数据
    • 高吞吐
    • 低延时
    • 压缩
  • 消息可靠性
  • 用法
    • 文档
    • 初始化
    • 配置
    • 线程和回调
    • Brokers
    • Producer API
    • Consumer API
  • 附录
    • 测试详情

性能

librdkafka 是一个基于现代硬件设计的多线程库, 并且试图保持最少的内存拷贝。
如果应用程序愿意,生产和消费消息的载体可以不通过任何拷贝实现让消息大小不受限制。

librdkafka 同样适用于高吞吐还是低延时的场景,都可以通过属性配置接口来满足。

下面是两个对于性能调节非常重要的属性:

  • batch.num.messages - 发送消息集前,本地队列等待累计的最小消息数量。
  • queue.buffering.max.ms - 等待 batch.num.messages 数量消息填充本地队列的最长等待时间。

性能数据

接下来的性能测试数据受限于如下配置:

  • Intel Quad Core i7 at 3.4GHz, 8GB of memory
  • 影响磁盘性能的 brokers 刷新参数配置如下:
    • log.flush.interval.messages=10000000
    • log.flush.interval.ms=100000
  • 两个 brokers 运行在和 librdkafka 的同一台机器上。
  • 每个 topic 有两个 partition。
  • 每个 broker 被一个 partition 领导。
  • 使用 examples 子目录下 rdkafka_performance 程序。

测试结果

  • Test1: 2 brokers, 2 partitions, required.acks=2, 100 byte messages:
    850000 messages/second85 MB/second

  • Test2: 1 broker, 1 partition, required.acks=0, 100 byte messages:
    710000 messages/second71 MB/second

  • Test3: 2 broker2, 2 partitions, required.acks=2, 100 byte messages,
    snappy compression:
    300000 messages/second30 MB/second

  • Test4: 2 broker2, 2 partitions, required.acks=2, 100 byte messages,
    gzip compression:
    230000 messages/second23 MB/second

提示: 要了解命令的执行等,请参考本文最后的 测试详情 章节。

提示: 消费者的性能测试将会尽快公布。

高吞吐

高吞吐的关键是消息的批量处理,等待本地队列累计一定数量的消息,然后用一个大的消息集或批量发送到对端。
通过这种方式补偿通讯开销和消除往返时延(RTT)的不利影响。

默认配置 batch.num.messages=1000 和 queue.buffering.max.ms=1000 适用于高吞吐量场景。
这样配置允许 librdkafka 等待 1000 ms 让本地队列累计 1000 条消息,然后发送累计的消息到 broker。

这些配置是全局的 (rd_kafka_conf_t),但是基于每一个 topic+partition 上应用。

低延时

当要求发送低延时,“queue.buffering.max.ms”应该调整到尽可能适用于生产侧低延时。
设置“queue.buffering.max.ms” 为 1 来确保消息尽可能快的发送。
你可以查看如何降低消息延时来获取更多细节。

压缩

生产者消息压缩通过配置“compression.codec”属性生效。

压缩通过本地队列批量处理消息实现,批量越大越可能获得更高的压缩率。
本地批量队列大小通过“batch.num.messages”和“queue.buffering.max.ms”配置属性控制,已经在前面的高吞吐章节描述过了。

消息可靠性

消息可靠性是 librdkafka 的一个重要因素。
通过指定的配置(“request.required.acks”和“message.send.max.retries”,等)应用程序完全可以信赖 librdkafka 来分发消息。

如果 topic 配置属性“request.required.acks”设置为等待来自 broker 的消息提交确认(非0,详见CONFIGURATION.md),librdkafka 将会等待消直到所有期望的 ack 都收到,并优雅的处理下列事件:

  • Broker 连接失败
  • Topic 领导变更
  • 来自 broker 的生产者错误信号

这些事件都是 librdkafka 自动处理的。对与上面的事件,应用程序无须做任何处理。
失败消息将会重新发送“message.send.max.retries”次,然后返回失败报告给应用程序。

发送失败报告回调函数用于 librdkafka 给应用程序发送消息返回的状态信号,每个消息发送后的消息状态报告都会调用一次回调函数:

  • 如果error_code是非零,那么消息发送失败,error_code 表示失败类型(rd_kafka_resp_err_t枚举)。
  • 如果error_code是零,那么消息发送成功。

发送报告回调函数的更多使用详情查看生产者 API。

发送报告回调函数是可配置的。

用法

文档

librdkafka API 记录在rdkafka.h
头文件中,配置属性记录在CONFIGURATION.md中。

初始化

应用程序需要初始化一个顶层对象(rd_kafka_t)的基础容器,用于全局配置和共享状态。
通过调用rd_kafka_new()创建。

还需要实例化一个或多个 topic(rd_kafka_topic_t)用于生产或消费。
topic 对象保存 topic 级别的属性,并且维护一个映射,
该映射保存所有可用 partition 和他们的领导 broker 。
通过调用rd_kafka_topic_new()创建。

rd_kafka_t 和 rd_kafka_topic_t都源于可选的配置 API。
不使用该 API 将导致 librdkafka 使用列在文档CONFIGURATION.md中的默认配置。

提示:一个应用程序可以创建多个rd_kafka_t对象,它们不共享状态。

提示:一个rd_kafka_topic_t对象只能用于一个rd_kafka_t对象的创建。

配置

为了与官方的 Apache Kafka 软件一致和降低学习门槛,
librdkafka 使用了和 Apache Kafka 官方客户端完全一致的配置属性。

在创建对象之前,通过rd_kafka_conf_set() 和 rd_kafka_topic_conf_set() API
来应用配置。

提示:一旦通过rd_kafka.._new()使用过,rd_kafka.._conf_t 对象不能再重复使用。
调用rd_kafka.._new()后,应用程序不需要释放任何配置资源。

示例

rd_kafka_conf_t *conf;
char errstr[512];

conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "compression.codec", "snappy", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "batch.num.messages", "100", errstr, sizeof(errstr));

rd_kafka_new(RD_KAFKA_PRODUCER, conf);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

线程和回调

为了完全利用的现代硬件,librdkafka 本身支持多线程。
API 是完全线程安全。在任何时间和其任何线程中,应用程序都可以调用任何 API 函数。

一个基于调查的 API 被用于向应用程序返回信号。
应用程序需要定期调用rd_kafka_poll()。
这个调查 API 将调用以下配置的回调函数(可选):

  • 消息发送报告回调函数 - 标示一个消息被发送或发送失败,应用程序可以做出处理货释放消息内使用应用资源。
  • 错误回调函数 - 标示一个错误。这些错误通常是一个信息类别,比如连接 broker 错误,
    应用程序通常不需要做任何处理。错误的类型在 rd_kafka_resp_err_t 枚举中指定,
    包括远程 broker 错误和本地错误。

可选的回调函数不是通过调查出发,它们可能被任何线程调用:

  • 日志回调函数 - 允许应用程序输出 librdkafka 生成的日志消息。
  • 分区回调函数 - 应用程序提供消息分区的方法。在任何时候任何线程中,分区函数都可能被调用,
    并可能被使用同一个键值调用多次。
    分区函数约束:
    • 不得调用任何 rd_kafka_*() 类型函数
    • 不得阻塞或执行长时间的函数
    • 必须返回一个 0 到 partition_cnt-1 之间的值,
      或分区无法执行时返回特殊的 RD_KAFKA_PARTITION_UA 值

Brokers

librdkafka 只需要初始化一个 broker 列表(至少一个)来调用 broker 引导。
librdkafka 会连接所有列在“metadata.broker.list”属性中或调用rd_kafka_brokers_add()添加的 broker 引导,并且查询列表中每一的元数据信息,包括 broker、topic、partition 和 kafka 集群的领导者。

Broker 名字类似于“host[:port]”,其中端口是可选的(默认 9092),host是可用的主机名或IPv4、IPv6地址。
如果是一个复杂地址,librdkafka会循环地址尝试连接。
一个 DNS 记录用于包含所有可用于引导的 broker 地址。

新特性

Apache Kafka broker 版本 0.10.0 新增了一个 ApiVersionRequest API,允许客户端查询 broker 支持的 API 版本。

librdkafka 支持这个特性,会查询每一个 broker 获取该信息(如果api.version.request=true),根据该信息生效或失效各种特性,如MessageVersion 1 (timestamps), KafkaConsumer等。

如果 broker 没有对 librdkafka 的 ApiVersionRequest 请求作出正确响应,会认为 broker 版本太老不支持该 API,并回退到老版本 broker 的 API。回退的版本可配置到 librdkafka 中,通过broker.version.fallback属性控制。

Producer API

通过RD_KAFKA_PRODUCER类型设置好rd_kafka_t对象后,一个或多个rd_kafka_topic_t对象就准备好接收消息,并组装和发送到 broker。

rd_kafka_produce()函数接受如下参数:

  • rkt - 生产的 topic,之前通过rd_kafka_topic_new()生成
  • partition - 生产的 partition。如果设置为RD_KAFKA_PARTITION_UA(未赋值的),需要通过配置分区函数去选择一个确定 partition。
  • msgflags - 0 或下面的值:

    • RD_KAFKA_MSG_F_COPY - librdkafka 将立即从 payload 做一份拷贝。如果 payload 是不稳定存储,如栈,需要使用这个参数。
    • RD_KAFKA_MSG_F_FREE - 当 payload 使用完后,让 librdkafka 使用free(3)释放。

    这两个标志互斥,如果都不设置,payload 既不会被拷贝也不会被 librdkafka 释放。

    如果RD_KAFKA_MSG_F_COPY标志不设置,就会有数据拷贝,librdkafka 将占用 payload 指针直到消息被发送或失败。
    librdkafka 处理完消息后,会调用发送报告回调函数,让应用程序重新获取 payload 的所有权。
    如果设置了RD_KAFKA_MSG_F_FREE,应用程序就不要在发送报告回调函数中释放 payload。

  • payload,len - 消息 payload
  • key,keylen - 可选的消息键,用于分区。将会用于 topic 分区回调函数,如果有,会附加到消息中发送给 broker。
  • msg_opaque - 可选的,应用程序为每个消息提供的无类型指针,提供给消息发送回调函数,用于应用程序引用。

rd_kafka_produce() 是一个非阻塞 API,该函数会将消息塞入一个内部队列并立即返回。
如果队列中的消息数超过queue.buffering.max.messages属性配置的值,rd_kafka_produce()通过返回 -1,并在ENOBUFS中设置错误码来反馈错误。

提示: 见 examples/rdkafka_performance.c 获取生产者的使用。

Simple Consumer API (legacy)

提示:要获取 high-level KafkaConsumer 接口,查看 rd_kafka_subscribe (rdkafka.h) 或 KafkaConsumer (rdkafkacpp.h)

消费者 API 比生产者 API 更复杂。
通过RD_KAFKA_CONSUMER类型创建rd_kafka_t对象并实例化rd_kafka_topic_t后,应用程序还需要调用rd_kafka_consume_start()指定partition。

rd_kafka_consume_start()参数:

  • rkt - 要消费的 topic ,之前通过rd_kafka_topic_new()创建。
  • partition - 要消费的 partition。
  • offset - 消费开始的消息偏移量。可以是绝对的值或两中特殊的偏移量:
    • RD_KAFKA_OFFSET_BEGINNING 从该 partition 的队列的最开始消费(最老的消息)。
    • RD_KAFKA_OFFSET_END 从该 partition 产生的下一个消息开始消费。
    • RD_KAFKA_OFFSET_STORED 使用偏移量存储。

当一个 topic+partition 消费者被启动,librdkafka 不断尝试从 broker 批量获取消息来保持本地队列有queued.min.messages数量的消息。

本地消息队列通过 3 个不同的消费 API 向应用程序提供服务:

  • rd_kafka_consume() - 消费一个消息
  • rd_kafka_consume_batch() - 消费一个或多个消息
  • rd_kafka_consume_callback() - 消费本地队列中的所有消息,且每一个都调用回调函数

这三个 API 的性能按照升序排列,rd_kafka_consume()最慢,rd_kafka_consume_callback()最快。不同的类型满足不同的应用需要。

被上述函数消费的消息通过rd_kafka_message_t类型返回。

rd_kafka_message_t的成员:
err - 返回给应用程序的错误信号。如果该值不是零,payload字段应该是一个错误的消息,err是一个错误码(rd_kafka_resp_err_t)。
rkt,partition - 消息的 topic 和 partition 或错误。
payload,len - 消息的数据或错误的消息 (err!=0)。
key,key_len - 可选的消息键,生产者指定。
offset - 消息偏移量。

不管是payloadkey的内存,还是整个消息,都由 librdkafka 所拥有,且在rd_kafka_message_destroy()被调用后不要使用。
librdkafka 为了避免消息集的多余拷贝,会为所有从内存缓存中接收的消息共享同一个消息集,这意味着如果应用程序保留单个rd_kafka_message_t,将会阻止内存释放并用于同一个消息集的其他消息。

当应用程序从一个 topic+partition 中消费完消息,应该调用rd_kafka_consume_stop()来结束消费。该函数同时会清空当前本地队列中的所有消息。

提示: 见 examples/rdkafka_performance.c 获取消费者的使用。

偏移量管理

当 broker 版本大于等于 0.9.0 时,基于 broker 的偏移量管理可通过使用 high-level KafkaConsumer 接口(见rdkafka.h 或 rdkafkacpp.h)实现。

偏移量管理同样也可以通过本地偏移量文件存储,按每一个topic+partition,偏移量定时写入本地文件。通过下列 topic 属性配置:

  • auto.commit.enable
  • auto.commit.interval.ms
  • offset.store.path
  • offset.store.sync.interval.ms

当前还不支持 zookeeper 的偏移量管理。

消费者组

基于消费者组的 broker 是支持的(要求 Apache Kafka broker >=0.9)。见 rdkafka.h 或 rdkafkacpp.h 中的 KafkaConsumer。

Topics

Topic 自动创建

librdkafka 支持自动创建 topic。broker 需要配置auto.create.topics.enable=true

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐