CONFIGURATION.md

全局配置属性

属性C/P范围默认值描述
builtin.features* gzip, snappy, ssl, sasl, regex, lz4标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性。
Type: CSV flags
client.id* rdkafka客户端标示。Type: string
metadata.broker.list*  初始化的broker列表。应用程序也可以使用 rd_kafka_brokers_add() 在运行时添加 broker。
Type: string
bootstrap.servers*  参考 metadata.broker.list
message.max.bytes*1000 .. 10000000001000000最大发送消息大小。
Type: integer
message.copy.max.bytes*0 .. 100000000065535消息拷贝到缓存的最大大小。如果消息大于这个值,将会消耗更多的iovec而采用引用(零拷贝)方式。
Type: integer
receive.message.max.bytes*1000 .. 1000000000100000000最大接收消息大小。这是一个安全预防措施,防止协议饱和时内存耗尽。这个值至少为 fetch.message.max.bytes * 消费者分区数 + 消息头大小 (e.g. 200000 bytes). 
Type: integer
max.in.flight.requests.per.connection*1 .. 10000001000000客户端保持的最大发送请求数。 该配置应用于每一个 broker 连接. 
Type: integer
metadata.request.timeout.ms*10 .. 90000060000无数据请求超时时间,毫秒。 适用于 metadata 请求等。
Type: integer
topic.metadata.refresh.interval.ms*-1 .. 3600000300000Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。 
Type: integer
metadata.max.age.ms*  参考 topic.metadata.refresh.interval.ms
topic.metadata.refresh.fast.cnt*0 .. 100010当 topic 丢失 leader, metadata 请求的发送次数,发送间隔是 topic.metadata.refresh.fast.interval.ms 而不是 topic.metadata.refresh.interval.ms。 该配置用于快速修复broker leader。
Type: integer
topic.metadata.refresh.fast.interval.ms*1 .. 60000250参考 topic.metadata.refresh.fast.cnt
Type: integer
topic.metadata.refresh.sparse*true, falsetrue极少的 metadata 请求 (消费者的网络带宽很小) 
Type: boolean
topic.blacklist*  Topic 黑名单,逗号分隔的正则表达式列表,匹配topic名字,匹配到的 topic 如果不存在,就在 broker metadata 信息中忽略。
Type: pattern list
debug*generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, all 逗号分隔的列表,控制 debug 上下文。调试生产者:broker,topic,msg。调试消费者:cgrp,topic,fetch 
Type: CSV flags
socket.timeout.ms*10 .. 30000060000网络请求超时时间。
Type: integer
socket.blocking.max.ms*1 .. 60000100broker 在 socket 操作时最大阻塞时间。值越低,响应越快,但会略微提高CPU使用率。
Type: integer
socket.send.buffer.bytes*0 .. 1000000000Broker socket 发送缓冲大小。系统默认为 0。
Type: integer
socket.receive.buffer.bytes*0 .. 1000000000Broker socket 接收缓冲大小。系统默认为 0。
Type: integer
socket.keepalive.enable*true, falsefalseBroker sockets 允许 TCP 保持活力 (SO_KEEPALIVE)。
Type: boolean
socket.max.fails*0 .. 10000003Broker 关闭连接的最大错误次数(e.g., timed out requests)。0不关闭。提示:连接自动重新建立。
Type: integer
broker.address.ttl*0 .. 864000001000保存 broker 地址响应结果的时间 (毫秒)。
Type: integer
broker.address.family*any, v4, v6any允许的 broker IP 地址族:any, v4, v6。
Type: enum value
reconnect.backoff.jitter.ms*0 .. 3600000500通过这个值调节 broker 重连尝试 +-50%。
Type: integer
statistics.interval.ms*0 .. 864000000librdkafka 统计间隔。应用程序需要通过 rd_kafka_conf_set_stats_cb()设置统计的回调函数。粒度是 1000ms. 0 关闭统计。
Type: integer
enabled_events*0 .. 21474836470参考 rd_kafka_conf_set_events()
Type: integer
error_cb*  错误回调函数 (参考 rd_kafka_conf_set_error_cb()) 
Type: pointer
throttle_cb*  调节回调函数 (参考 rd_kafka_conf_set_throttle_cb()) 
Type: pointer
stats_cb*  统计回调函数 (参考 rd_kafka_conf_set_stats_cb()) 
Type: pointer
log_cb*  日志回调函数 (参考 rd_kafka_conf_set_log_cb()) 
Type: pointer
log_level*0 .. 76日志界别 (syslog(3) levels) 
Type: integer
log.thread.name*true, falsefalse在日志消息中打印内部线程名。(useful for debugging librdkafka internals) 
Type: boolean
log.connection.close*true, falsetrue记录 broker 断开连接。由于受 0.9 版本 broker 的 connection.max.idle.ms 的影响,最好关闭。
Type: boolean
socket_cb*  为Socket创建回调函数提供无缝 CLOEXEC 
Type: pointer
open_cb*  为文件打开回调函数提供无缝 CLOEXEC 
Type: pointer
opaque*  对应用程序不开放 (set with rd_kafka_conf_set_opaque()) 
Type: pointer
default_topic_conf*  默认 topic 配置,用于自动订阅 topics 
Type: pointer
internal.termination.signal*0 .. 1280用于 librdkafka 调用 rd_kafka_destroy() 快速终止的信号。如果没有设置信号, 终止过程会延迟直到所有内部线程的系统调用超时返回,且 rd_kafka_wait_destroyed() 返回 true。如果设置了信号,延迟会最小化。应用程序需要屏蔽该信号,而作为内部信号句柄。
Type: integer
api.version.request*true, falsefalse请求 broker 支持的API版本,调整可用协议特性的功能。如果设置为false,将使用 broker.version.fallback设置的回退版本。 提示: 依赖的 broker 版本 >=0.10.0。如果 broker(老版本)不支持该请求,使用 broker.version.fallback设置的回退版本。
Type: boolean
api.version.fallback.ms*0 .. 6048000001200000配置 ApiVersionRequest 失败多长时间后,使用 broker.version.fallback 回退版本。提示: ApiVersionRequest 只用新的 broker 能使用。
Type: integer
broker.version.fallback* 0.9.0老版本的 broker(<0.10.0)不支持客户端查询支持协议特性(ApiVersionRequest, see api.version.request),所以要客户端不知道什么特性可以使用。 用户使用本属性指示 broker 版本,如果 ApiVersionRequest 失败(或不可用),客户端据此属性自动调整特性。与 api.version.fallback.ms 配合使用。有效值:0.9.0, 0.8.2, 0.8.1, 0.8.0. 
Type: string
security.protocol*plaintext, ssl, sasl_plaintext, sasl_sslplaintext与 broker 通讯的协议。
Type: enum value
ssl.cipher.suites*  密码套件是个组合体,包括鉴权,加密,认证和秘钥交换程序,用于网络连接的安全设置交换,使用 TLS 或 SSL 网络协议。查看手册 ciphers(1) 和 `SSL_CTX_set_cipher_list(3)。 
Type: string
ssl.key.location*  客户端的私钥(PEM)路径,用于鉴权。
Type: string
ssl.key.password*  私钥密码。
Type: string
ssl.certificate.location*  客户端的公钥(PEM)路径,用于鉴权。
Type: string
ssl.ca.location*  CA 证书文件或路径,用于校验 broker key。
Type: string
ssl.crl.location*  CRL 路径,用于 broker 的证书校验。
Type: string
sasl.mechanisms*GSSAPI, PLAINGSSAPI使用 SASL 机制鉴权。 支持:GSSAPI, PLAIN. 提示: 只能配置一种机制名。
Type: string
sasl.kerberos.service.name* kafkaKafka 运行的 Kerberos 首要名。
Type: string
sasl.kerberos.principal* kafkaclient客户端的 Kerberos 首要名。
Type: string
sasl.kerberos.kinit.cmd* kinit -S “%{sasl.kerberos.service.name}/%{broker.name}” -k -t “%{sasl.kerberos.keytab}” %{sasl.kerberos.principal}完整的 kerberos kinit 命令串,%{config.prop.name} 替换为与配置对象一直的值,%{broker.name} broker 的主机名。
Type: string
sasl.kerberos.keytab*  Kerberos keytab 文件的路径。如果不设置,则使用系统默认的。提示:不会自动使用,必须在 sasl.kerberos.kinit.cmd 中添加到模板,如 ... -t %{sasl.kerberos.keytab}。 
Type: string
sasl.kerberos.min.time.before.relogin*1 .. 8640000060000Key 恢复尝试的最小时间,毫秒。
Type: integer
sasl.username*  使用 PLAIN 机制时,SASL 用户名。
Type: string
sasl.password*  使用 PLAIN 机制时,SASL 密码。
Type: string
group.id*  客户端分组字符串。同组的客户端使用相同的 group.id。
Type: string
partition.assignment.strategy* range,roundrobinpartition 分配策略,当选举组 leader 时,分配 partition 给组成员的策略。
Type: string
session.timeout.ms*1 .. 360000030000客户端组会话探测失败超市时间。
Type: integer
heartbeat.interval.ms*1 .. 36000001000组会话保活心跳间隔。
Type: integer
group.protocol.type* consumer组协议类型。
Type: string
coordinator.query.interval.ms*1 .. 3600000600000多久查询一次当前的客户端组协调人。如果当前的分配协调人挂了,为了更快的恢复协调人,探测时间间隔会除以 10。
Type: integer
enable.auto.commitCtrue, falsetrue在后台周期性的自动提交偏移量。
Type: boolean
auto.commit.interval.msC0 .. 864000005000消费者偏移量提交(写入)到存储的频率,毫秒。(0 = 不可用) 
Type: integer
enable.auto.offset.storeCtrue, falsetrue为应用程序自动保存最后消息的偏移量。
Type: boolean
queued.min.messagesC1 .. 10000000100000每一个 topic+partition,本地消费者队列的最小消息数。
Type: integer
queued.max.messages.kbytesC1 .. 10000000001000000每一个 topic+partition,本地消费者队列的最大大小,单位kilobytes。该值应该大于 fetch.message.max.bytes。
Type: integer
fetch.wait.max.msC0 .. 300000100为写满fetch.min.bytes,broker 的最大等待时间。
Type: integer
fetch.message.max.bytesC1 .. 10000000001048576每一个 topic+partition 初始化的最大大小(bytes)用于从 broker 读消息。如果客户端遇到消息大于这个值,会逐步扩大直到塞下这个消息。
Type: integer
max.partition.fetch.bytesC  参考 fetch.message.max.bytes
fetch.min.bytesC1 .. 1000000001broker 请求的最小数据大小,单位bytes。如果达到 fetch.wait.max.ms 时间,则不管这个配置,将已收到的数据发送给客户端。
Type: integer
fetch.error.backoff.msC0 .. 300000500对于 topic+partition,如果接受错误,下一个接受请求间隔多长时间。
Type: integer
offset.store.methodCnone, file, brokerbroker偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。
Type: enum value
consume_cbC  消息消费回调函数 (参考 rd_kafka_conf_set_consume_cb()) 
Type: pointer
rebalance_cbC  消费者组重新分配后调用 (参考 rd_kafka_conf_set_rebalance_cb()) 
Type: pointer
offset_commit_cbC  偏移量提交结果回调函数 (参考 rd_kafka_conf_set_offset_commit_cb()) 
Type: pointer
enable.partition.eofCtrue, falsetrue当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件。
Type: boolean
queue.buffering.max.messagesP1 .. 10000000100000生产者队列允许的最大消息数。
Type: integer
queue.buffering.max.kbytesP1 .. 21474836474000000生产者队列允许的最大大小,单位kb。
Type: integer
queue.buffering.max.msP1 .. 9000001000生产者队列缓存数据的最大时间,毫秒。
Type: integer
message.send.max.retriesP0 .. 100000002消息集发送失败重试次数。提示 重试会导致重排。
Type: integer
retriesP  参考 message.send.max.retries
retry.backoff.msP1 .. 300000100重试消息发送前的补偿时间。
Type: integer
compression.codecPnone, gzip, snappy, lz4none压缩消息集使用的压缩编解码器。这里配置的是所有 topic 的默认值,可能会被 topic 上的 compression.codec 属性覆盖。
Type: enum value
batch.num.messagesP1 .. 100000010000一个消息集最大打包消息数量。整个消息集的大小仍受限于 message.max.bytes。 
Type: integer
delivery.report.only.errorPtrue, falsefalse只对失败的消息提供分发报告。
Type: boolean
dr_cbP  分发报告回调函数 (参考 rd_kafka_conf_set_dr_cb()) 
Type: pointer
dr_msg_cbP  分发报告回调函数 (参考 rd_kafka_conf_set_dr_msg_cb()) 
Type: pointer

Topic 配置属性

属性C/P范围默认值描述
request.required.acksP-1 .. 10001这个字段标示 leader broker 要从 ISR broker 接收多少个 ack,然后才确认发送请求:0=不发送任何 response/ack 给客户端, 1=只有 leader broker 需要 ack 消息, -1 or all=broker 阻塞直到所有的同步备份(ISRs)或in.sync.replicas设置的备份返回消息提交的确认应答。
Type: integer
acksP  参考 request.required.acks
request.timeout.msP1 .. 9000005000生产者请求等待应答的超时时间,毫秒。这个值仅在 broker 上强制执行。参考 request.required.acks,不能等于 0. 
Type: integer
message.timeout.msP0 .. 900000300000本地消息超时时间。这个值仅在本地强制执行,限制生产的消息等待被成功发送的等待时间,0 是不限制。
Type: integer
produce.offset.reportPtrue, falsefalse报告生产消息的偏移量给应用程序。应用程序必须使用 dr_msg_cb从 rd_kafka_message_t.offset 中获取偏移量。
Type: boolean
partitioner_cbP  分区方法回调函数 (参考 rd_kafka_topic_conf_set_partitioner_cb()) 
Type: pointer
opaque*  应用程序不可见 (参考 rd_kafka_topic_conf_set_opaque()) 
Type: pointer
compression.codecPnone, gzip, snappy, lz4, inheritinherit压缩消息集的压缩编解码器。
Type: enum value
auto.commit.enableCtrue, falsetrue如果是 true,周期性的提交最后一个消息的偏移量。用于当程序重启时抛弃不用的消息。如果是 false,应用程序需要调用 rd_kafka_offset_store() 保存偏移量 (可选). 提示 这个属性时能用于简单消费者,high-level KafkaConsumer 会被全局的 enable.auto.commit 属性替代。提示 目前没有整合 zookeeper,根据 offset.store.method 的配置 偏移量将写入 broker 或本地文件 file according to offset.store.method. 
Type: boolean
enable.auto.commitC  参考 auto.commit.enable
auto.commit.interval.msC10 .. 8640000060000消费者偏移量提交(写入)到存储的频率,毫秒。
Type: integer
auto.offset.resetCsmallest, earliest, beginning, largest, latest, end, errorlargest如果偏移量存储还没有初始化或偏移量超过范围时的处理方式:Action to take when there is no initial offset in offset store or the desired offset is out of range: ‘smallest’,’earliest’ - 自动重设偏移量为最小偏移量,’largest’,’latest’ - 自动重设偏移量为最大偏移量,’error’ - 通过消费消息触发一个错误,请检查 message->err。 
Type: enum value
offset.store.pathC .存储偏移量的本地文件路径。如果路径是个目录,在目录下自动创建基于 topic 和 partition 的文件名。
Type: string
offset.store.sync.interval.msC-1 .. 86400000-1偏移量文件 fsync() 的间隔,毫秒。-1 不同步,0 每次写入后立即同步。
Type: integer
offset.store.methodCfile, brokerbroker偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。
Type: enum value
consume.callback.max.messagesC0 .. 10000000一次 rd_kafka_consume_callback*() 调配的最大消息数 (0 = 无限制) 
Type: integer

C/P 含义:C = 生产者, P = 消费者, * = 二者都有


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐