librdkafka配置参数
CONFIGURATION.md全局配置属性属性C/P范围默认值描述builtin.features* gzip, snappy, ssl, sasl, regex, lz4标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性。Type: CSV flagsclient.id* rdkafka客户端标示。Type: stringmetadata.broke
·
全局配置属性
属性 | C/P | 范围 | 默认值 | 描述 |
---|---|---|---|---|
builtin.features | * | gzip, snappy, ssl, sasl, regex, lz4 | 标示该librdkafka的支持的内建特性。应用程序可以查看或设置这些值来检查是否支持这些特性。 Type: CSV flags | |
client.id | * | rdkafka | 客户端标示。Type: string | |
metadata.broker.list | * | 初始化的broker列表。应用程序也可以使用 rd_kafka_brokers_add() 在运行时添加 broker。Type: string | ||
bootstrap.servers | * | 参考 metadata.broker.list | ||
message.max.bytes | * | 1000 .. 1000000000 | 1000000 | 最大发送消息大小。 Type: integer |
message.copy.max.bytes | * | 0 .. 1000000000 | 65535 | 消息拷贝到缓存的最大大小。如果消息大于这个值,将会消耗更多的iovec而采用引用(零拷贝)方式。 Type: integer |
receive.message.max.bytes | * | 1000 .. 1000000000 | 100000000 | 最大接收消息大小。这是一个安全预防措施,防止协议饱和时内存耗尽。这个值至少为 fetch.message.max.bytes * 消费者分区数 + 消息头大小 (e.g. 200000 bytes). Type: integer |
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | 客户端保持的最大发送请求数。 该配置应用于每一个 broker 连接. Type: integer |
metadata.request.timeout.ms | * | 10 .. 900000 | 60000 | 无数据请求超时时间,毫秒。 适用于 metadata 请求等。 Type: integer |
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | Topic metadata 刷新间隔,毫秒。metadata 自动刷新错误和连接。设置为 -1 关闭刷新间隔。 Type: integer |
metadata.max.age.ms | * | 参考 topic.metadata.refresh.interval.ms | ||
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | 当 topic 丢失 leader, metadata 请求的发送次数,发送间隔是 topic.metadata.refresh.fast.interval.ms 而不是 topic.metadata.refresh.interval.ms 。 该配置用于快速修复broker leader。Type: integer |
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | 参考 topic.metadata.refresh.fast.cnt 。Type: integer |
topic.metadata.refresh.sparse | * | true, false | true | 极少的 metadata 请求 (消费者的网络带宽很小) Type: boolean |
topic.blacklist | * | Topic 黑名单,逗号分隔的正则表达式列表,匹配topic名字,匹配到的 topic 如果不存在,就在 broker metadata 信息中忽略。 Type: pattern list | ||
debug | * | generic, broker, topic, metadata, queue, msg, protocol, cgrp, security, fetch, feature, all | 逗号分隔的列表,控制 debug 上下文。调试生产者:broker,topic,msg。调试消费者:cgrp,topic,fetch Type: CSV flags | |
socket.timeout.ms | * | 10 .. 300000 | 60000 | 网络请求超时时间。 Type: integer |
socket.blocking.max.ms | * | 1 .. 60000 | 100 | broker 在 socket 操作时最大阻塞时间。值越低,响应越快,但会略微提高CPU使用率。 Type: integer |
socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | Broker socket 发送缓冲大小。系统默认为 0。 Type: integer |
socket.receive.buffer.bytes | * | 0 .. 100000000 | 0 | Broker socket 接收缓冲大小。系统默认为 0。 Type: integer |
socket.keepalive.enable | * | true, false | false | Broker sockets 允许 TCP 保持活力 (SO_KEEPALIVE)。 Type: boolean |
socket.max.fails | * | 0 .. 1000000 | 3 | Broker 关闭连接的最大错误次数(e.g., timed out requests)。0不关闭。提示:连接自动重新建立。 Type: integer |
broker.address.ttl | * | 0 .. 86400000 | 1000 | 保存 broker 地址响应结果的时间 (毫秒)。 Type: integer |
broker.address.family | * | any, v4, v6 | any | 允许的 broker IP 地址族:any, v4, v6。 Type: enum value |
reconnect.backoff.jitter.ms | * | 0 .. 3600000 | 500 | 通过这个值调节 broker 重连尝试 +-50%。 Type: integer |
statistics.interval.ms | * | 0 .. 86400000 | 0 | librdkafka 统计间隔。应用程序需要通过 rd_kafka_conf_set_stats_cb() 设置统计的回调函数。粒度是 1000ms. 0 关闭统计。Type: integer |
enabled_events | * | 0 .. 2147483647 | 0 | 参考 rd_kafka_conf_set_events() 。Type: integer |
error_cb | * | 错误回调函数 (参考 rd_kafka_conf_set_error_cb()) Type: pointer | ||
throttle_cb | * | 调节回调函数 (参考 rd_kafka_conf_set_throttle_cb()) Type: pointer | ||
stats_cb | * | 统计回调函数 (参考 rd_kafka_conf_set_stats_cb()) Type: pointer | ||
log_cb | * | 日志回调函数 (参考 rd_kafka_conf_set_log_cb()) Type: pointer | ||
log_level | * | 0 .. 7 | 6 | 日志界别 (syslog(3) levels) Type: integer |
log.thread.name | * | true, false | false | 在日志消息中打印内部线程名。(useful for debugging librdkafka internals) Type: boolean |
log.connection.close | * | true, false | true | 记录 broker 断开连接。由于受 0.9 版本 broker 的 connection.max.idle.ms 的影响,最好关闭。Type: boolean |
socket_cb | * | 为Socket创建回调函数提供无缝 CLOEXEC Type: pointer | ||
open_cb | * | 为文件打开回调函数提供无缝 CLOEXEC Type: pointer | ||
opaque | * | 对应用程序不开放 (set with rd_kafka_conf_set_opaque()) Type: pointer | ||
default_topic_conf | * | 默认 topic 配置,用于自动订阅 topics Type: pointer | ||
internal.termination.signal | * | 0 .. 128 | 0 | 用于 librdkafka 调用 rd_kafka_destroy() 快速终止的信号。如果没有设置信号, 终止过程会延迟直到所有内部线程的系统调用超时返回,且 rd_kafka_wait_destroyed() 返回 true。如果设置了信号,延迟会最小化。应用程序需要屏蔽该信号,而作为内部信号句柄。 Type: integer |
api.version.request | * | true, false | false | 请求 broker 支持的API版本,调整可用协议特性的功能。如果设置为false,将使用 broker.version.fallback 设置的回退版本。 提示: 依赖的 broker 版本 >=0.10.0。如果 broker(老版本)不支持该请求,使用 broker.version.fallback 设置的回退版本。Type: boolean |
api.version.fallback.ms | * | 0 .. 604800000 | 1200000 | 配置 ApiVersionRequest 失败多长时间后,使用 broker.version.fallback 回退版本。提示: ApiVersionRequest 只用新的 broker 能使用。Type: integer |
broker.version.fallback | * | 0.9.0 | 老版本的 broker(<0.10.0)不支持客户端查询支持协议特性 (ApiVersionRequest, see api.version.request ),所以要客户端不知道什么特性可以使用。 用户使用本属性指示 broker 版本,如果 ApiVersionRequest 失败(或不可用),客户端据此属性自动调整特性。与 api.version.fallback.ms 配合使用。有效值:0.9.0, 0.8.2, 0.8.1, 0.8.0. Type: string | |
security.protocol | * | plaintext, ssl, sasl_plaintext, sasl_ssl | plaintext | 与 broker 通讯的协议。 Type: enum value |
ssl.cipher.suites | * | 密码套件是个组合体,包括鉴权,加密,认证和秘钥交换程序,用于网络连接的安全设置交换,使用 TLS 或 SSL 网络协议。查看手册 ciphers(1) 和 `SSL_CTX_set_cipher_list(3)。 Type: string | ||
ssl.key.location | * | 客户端的私钥(PEM)路径,用于鉴权。 Type: string | ||
ssl.key.password | * | 私钥密码。 Type: string | ||
ssl.certificate.location | * | 客户端的公钥(PEM)路径,用于鉴权。 Type: string | ||
ssl.ca.location | * | CA 证书文件或路径,用于校验 broker key。 Type: string | ||
ssl.crl.location | * | CRL 路径,用于 broker 的证书校验。 Type: string | ||
sasl.mechanisms | * | GSSAPI, PLAIN | GSSAPI | 使用 SASL 机制鉴权。 支持:GSSAPI, PLAIN. 提示: 只能配置一种机制名。 Type: string |
sasl.kerberos.service.name | * | kafka | Kafka 运行的 Kerberos 首要名。 Type: string | |
sasl.kerberos.principal | * | kafkaclient | 客户端的 Kerberos 首要名。 Type: string | |
sasl.kerberos.kinit.cmd | * | kinit -S “%{sasl.kerberos.service.name}/%{broker.name}” -k -t “%{sasl.kerberos.keytab}” %{sasl.kerberos.principal} | 完整的 kerberos kinit 命令串,%{config.prop.name} 替换为与配置对象一直的值,%{broker.name} broker 的主机名。 Type: string | |
sasl.kerberos.keytab | * | Kerberos keytab 文件的路径。如果不设置,则使用系统默认的。提示:不会自动使用,必须在 sasl.kerberos.kinit.cmd 中添加到模板,如 ... -t %{sasl.kerberos.keytab} 。 Type: string | ||
sasl.kerberos.min.time.before.relogin | * | 1 .. 86400000 | 60000 | Key 恢复尝试的最小时间,毫秒。 Type: integer |
sasl.username | * | 使用 PLAIN 机制时,SASL 用户名。 Type: string | ||
sasl.password | * | 使用 PLAIN 机制时,SASL 密码。 Type: string | ||
group.id | * | 客户端分组字符串。同组的客户端使用相同的 group.id。 Type: string | ||
partition.assignment.strategy | * | range,roundrobin | partition 分配策略,当选举组 leader 时,分配 partition 给组成员的策略。 Type: string | |
session.timeout.ms | * | 1 .. 3600000 | 30000 | 客户端组会话探测失败超市时间。 Type: integer |
heartbeat.interval.ms | * | 1 .. 3600000 | 1000 | 组会话保活心跳间隔。 Type: integer |
group.protocol.type | * | consumer | 组协议类型。 Type: string | |
coordinator.query.interval.ms | * | 1 .. 3600000 | 600000 | 多久查询一次当前的客户端组协调人。如果当前的分配协调人挂了,为了更快的恢复协调人,探测时间间隔会除以 10。 Type: integer |
enable.auto.commit | C | true, false | true | 在后台周期性的自动提交偏移量。 Type: boolean |
auto.commit.interval.ms | C | 0 .. 86400000 | 5000 | 消费者偏移量提交(写入)到存储的频率,毫秒。(0 = 不可用) Type: integer |
enable.auto.offset.store | C | true, false | true | 为应用程序自动保存最后消息的偏移量。 Type: boolean |
queued.min.messages | C | 1 .. 10000000 | 100000 | 每一个 topic+partition,本地消费者队列的最小消息数。 Type: integer |
queued.max.messages.kbytes | C | 1 .. 1000000000 | 1000000 | 每一个 topic+partition,本地消费者队列的最大大小,单位kilobytes。该值应该大于 fetch.message.max.bytes。 Type: integer |
fetch.wait.max.ms | C | 0 .. 300000 | 100 | 为写满fetch.min.bytes ,broker 的最大等待时间。Type: integer |
fetch.message.max.bytes | C | 1 .. 1000000000 | 1048576 | 每一个 topic+partition 初始化的最大大小(bytes)用于从 broker 读消息。如果客户端遇到消息大于这个值,会逐步扩大直到塞下这个消息。 Type: integer |
max.partition.fetch.bytes | C | 参考 fetch.message.max.bytes | ||
fetch.min.bytes | C | 1 .. 100000000 | 1 | broker 请求的最小数据大小,单位bytes。如果达到 fetch.wait.max.ms 时间,则不管这个配置,将已收到的数据发送给客户端。 Type: integer |
fetch.error.backoff.ms | C | 0 .. 300000 | 500 | 对于 topic+partition,如果接受错误,下一个接受请求间隔多长时间。 Type: integer |
offset.store.method | C | none, file, broker | broker | 偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。 Type: enum value |
consume_cb | C | 消息消费回调函数 (参考 rd_kafka_conf_set_consume_cb()) Type: pointer | ||
rebalance_cb | C | 消费者组重新分配后调用 (参考 rd_kafka_conf_set_rebalance_cb()) Type: pointer | ||
offset_commit_cb | C | 偏移量提交结果回调函数 (参考 rd_kafka_conf_set_offset_commit_cb()) Type: pointer | ||
enable.partition.eof | C | true, false | true | 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件。 Type: boolean |
queue.buffering.max.messages | P | 1 .. 10000000 | 100000 | 生产者队列允许的最大消息数。 Type: integer |
queue.buffering.max.kbytes | P | 1 .. 2147483647 | 4000000 | 生产者队列允许的最大大小,单位kb。 Type: integer |
queue.buffering.max.ms | P | 1 .. 900000 | 1000 | 生产者队列缓存数据的最大时间,毫秒。 Type: integer |
message.send.max.retries | P | 0 .. 10000000 | 2 | 消息集发送失败重试次数。提示 重试会导致重排。 Type: integer |
retries | P | 参考 message.send.max.retries | ||
retry.backoff.ms | P | 1 .. 300000 | 100 | 重试消息发送前的补偿时间。 Type: integer |
compression.codec | P | none, gzip, snappy, lz4 | none | 压缩消息集使用的压缩编解码器。这里配置的是所有 topic 的默认值,可能会被 topic 上的 compression.codec 属性覆盖。Type: enum value |
batch.num.messages | P | 1 .. 1000000 | 10000 | 一个消息集最大打包消息数量。整个消息集的大小仍受限于 message.max.bytes 。 Type: integer |
delivery.report.only.error | P | true, false | false | 只对失败的消息提供分发报告。 Type: boolean |
dr_cb | P | 分发报告回调函数 (参考 rd_kafka_conf_set_dr_cb()) Type: pointer | ||
dr_msg_cb | P | 分发报告回调函数 (参考 rd_kafka_conf_set_dr_msg_cb()) Type: pointer |
Topic 配置属性
属性 | C/P | 范围 | 默认值 | 描述 |
---|---|---|---|---|
request.required.acks | P | -1 .. 1000 | 1 | 这个字段标示 leader broker 要从 ISR broker 接收多少个 ack,然后才确认发送请求:0=不发送任何 response/ack 给客户端, 1=只有 leader broker 需要 ack 消息, -1 or all=broker 阻塞直到所有的同步备份(ISRs)或in.sync.replicas 设置的备份返回消息提交的确认应答。Type: integer |
acks | P | 参考 request.required.acks | ||
request.timeout.ms | P | 1 .. 900000 | 5000 | 生产者请求等待应答的超时时间,毫秒。这个值仅在 broker 上强制执行。参考 request.required.acks ,不能等于 0. Type: integer |
message.timeout.ms | P | 0 .. 900000 | 300000 | 本地消息超时时间。这个值仅在本地强制执行,限制生产的消息等待被成功发送的等待时间,0 是不限制。 Type: integer |
produce.offset.report | P | true, false | false | 报告生产消息的偏移量给应用程序。应用程序必须使用 dr_msg_cb 从 rd_kafka_message_t.offset 中获取偏移量。Type: boolean |
partitioner_cb | P | 分区方法回调函数 (参考 rd_kafka_topic_conf_set_partitioner_cb()) Type: pointer | ||
opaque | * | 应用程序不可见 (参考 rd_kafka_topic_conf_set_opaque()) Type: pointer | ||
compression.codec | P | none, gzip, snappy, lz4, inherit | inherit | 压缩消息集的压缩编解码器。 Type: enum value |
auto.commit.enable | C | true, false | true | 如果是 true,周期性的提交最后一个消息的偏移量。用于当程序重启时抛弃不用的消息。如果是 false,应用程序需要调用 rd_kafka_offset_store() 保存偏移量 (可选). 提示 这个属性时能用于简单消费者,high-level KafkaConsumer 会被全局的 enable.auto.commit 属性替代。提示 目前没有整合 zookeeper,根据 offset.store.method 的配置 偏移量将写入 broker 或本地文件 file according to offset.store.method. Type: boolean |
enable.auto.commit | C | 参考 auto.commit.enable | ||
auto.commit.interval.ms | C | 10 .. 86400000 | 60000 | 消费者偏移量提交(写入)到存储的频率,毫秒。 Type: integer |
auto.offset.reset | C | smallest, earliest, beginning, largest, latest, end, error | largest | 如果偏移量存储还没有初始化或偏移量超过范围时的处理方式:Action to take when there is no initial offset in offset store or the desired offset is out of range: ‘smallest’,’earliest’ - 自动重设偏移量为最小偏移量,’largest’,’latest’ - 自动重设偏移量为最大偏移量,’error’ - 通过消费消息触发一个错误,请检查 message->err 。 Type: enum value |
offset.store.path | C | . | 存储偏移量的本地文件路径。如果路径是个目录,在目录下自动创建基于 topic 和 partition 的文件名。 Type: string | |
offset.store.sync.interval.ms | C | -1 .. 86400000 | -1 | 偏移量文件 fsync() 的间隔,毫秒。-1 不同步,0 每次写入后立即同步。 Type: integer |
offset.store.method | C | file, broker | broker | 偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储 (要求 Apache Kafka 0.8.2 或以后版本)。 Type: enum value |
consume.callback.max.messages | C | 0 .. 1000000 | 0 | 一次 rd_kafka_consume_callback*() 调配的最大消息数 (0 = 无限制) Type: integer |
C/P 含义:C = 生产者, P = 消费者, * = 二者都有
更多推荐
已为社区贡献1条内容
所有评论(0)