下面聊聊Kafka的配置参数,包括生产者的配置参数、Broker的配置参数、消费者的配置参数。

1、生产者配置参数

  • acks
    该参数控制了生产者的消息发送确认机制,用于指定分区中必须有多少个副本成功接收到消息后生产者才会认为这条消息写入是成功的,即生产者需要Leader副本确认请求完成之前接收的应答数。
    该参数有3个值,含义如下
    在这里插入图片描述

  • buffer.memory
    Kafka生产者的Sender子线程在将消息批量发送到Kafka Broker端之前,会把消息先缓存到内存中,该参数决定了消息缓存的大小,默认值为32M;若生产者生产消息的速度大于将消息发送到Broker端的速度,那么生产者将会被堵塞,最终导致生产者抛出RecordTooLargeException异常;

  • batch.size
    该参数标识当Kafka客户端将多个消息发送到同一个分区的时候,生产者为例减少客户端与Broker端的网络请求数,会尝试先将消息批量打包一起进行统一发送,有助于提升网络吞吐量,其默认值为16KB。

  • compression.type
    该参数指定给到Topic中数据的压缩类型,共有4种压缩方式,gzip、snappy、lz4、zstd;也可以设置为uncompressd即不压缩。

  • client.id
    该参数表示唯一的id字符串标识。

  • connection.max.idle.ms
    该参数决定关闭生产者连接的时间阈值,其默认值为9min。

  • linger.ms
    该参数决定消息在生产者发送到Broker端之前,在客户端延迟发送的时间。

  • max.block.ms
    该参数用来控制send将消息堵塞多长时间,其默认值为60000ms。

  • max.request.size
    该参数表示生产者能发送消息的最大值,其默认值为1MB。

  • retries、retry.backoff.ms
    该参数表示通过内部的重试机制来执行恢复,并部署之间将异常抛出,如果重试次数达到设定的次数,生产者才会放弃重试并抛出异常,其默认值为0。

  • receive.buffer.bytes
    该参数用来设置接收消息缓冲区大小,其默认值为32KB。

  • send.buffer.bytes
    该参数用来设置发送消息缓冲区的大小,其默认值为128KB。

  • request.timeout.ms
    该参数用来决定生产者等待请求响应的最大时间,其默认值为30000ms。

  • reconnect.backoff.max.ms
    该参数表示Kafka客户端重连的最大时间,每次连接失败,重连时间都会成指数级增加,每次增加的时间会在20%随机浮动,以避免连接风暴出现。

  • reconnect.backoff.ms
    该参数表示Kafka客户端每次重连时候的间隔时间。

  • delivery.timeout.ms
    该参数用于指定客户端等待发送成功或失败时,客户端等待的时间上限。

  • partitioner.class
    该参数表示进行分区操作的类,其默认值为DefaultPartitioner。

  • transaction.timeout.ms
    该参数生产者主动终止当前正在进行的操作之前,Kafka等待操作更新的最大时间,其默认值为1min。

  • transaction.id
    该参数表示某个事务的id。

  • max.in.flight.requests.per.connection
    该参数表示在消息备堵塞前,每个客户端上发送的为应答请求的最大数量,其默认值为5。

  • metadata.max.age.ms
    该参数表示当超过这个时间间隔时,系统就会触发更新元数据,其默认值为5min。

  • metadata.max.idle.ms
    该参数用来控制生产者获取Topic元数据的时间。

2、Borker配置参数

  • broker.id
    该参数用来标识唯一的broker server,若不设置或broker.id<0,则会自动计算。
  • log.dir
    该参数用来设置log数据存放的目录,其默认值为/tmp/kafka-logs。
  • log.dirs
    该参数用来设置log数据存放的目录,如/tmp/kafka-logs。另外可以用逗号隔开设置多个目录,主要用来挂载到多个磁盘上,如/tmp/kafka0、/tmp/kafka1、/tmp/kafka2。若没有设置该参数,则使用log.dir配置替代。
  • listeners
    该参数为监听器配置。
  • advertised.listeners
    该参数用来配置Broker对外发布的监听器列表,存放在zookeeper中,客户端可以通过元数据信息拿到这个监听器列表对该Broker进行访问,如果为空,则默认会用listeners配置。
  • zookeeper.connect
    该参数用来已hostname:port形式指定zookeeper连接,也可以用逗号形式指定多个zookeeper连接。
  • auto.create.topics.enable
    该参数用来配置是否可以自动创建Topic。

3、消费者配置参数

  • key.deserializer
    该参数用来实现key的反序列化。
  • value.deserializer
    该参数用来实现value的反序列化。
  • bootstrap.servers
    该参数表示Kafka Broker集群的地址信息,格式为ip1:port、ip2:port等,不需要设定全部的集群地址,设置两个或两个以上即可。
  • group.id
    该参数表示消费者组的id,如果相同表示属于同一个消费者组,必须指定,否则报异常。
  • fetch.min.bytes
    该参数用来配置消费者在一次拉取请求中能从Kafka Broker中拉取的最小数据量,即调用poll方式时每次拉取的数据量,其默认值为1字节。
  • fetch.max.bytes
    与fetch.min.bytes参数对应,用来配置消费者在一次拉取请求中从Kafka Broker端中拉取的最大数据量,其默认值为50MB。
  • fetch.max.wait.ms
    该参数于fetch.min.bytes有关,若Kafka Broker端返回给消费者的数据量小于fetch.min.bytes值,消费者就需要等待,指定数据量满足参数的配置大小。
  • max.poll.records
    该参数用来配置消费者在一次拉取请求中拉取的最大消息数,其默认值为500条;若消息数都比较小,可以适当调大该参数提升消费速度。
  • max.partition.fetch.bytes
    该参数用来配置从每个分区返回给消费者的最大数据量,其默认值为1MB。
  • connections.max.idle.ms
    该参数用来指定在多长时间之后关闭闲置的消费者连接,其默认值为9min。
  • heartbeat.inerval.ms
    该参数用来设置于消费者协调器之间的心跳间隔时间。
  • send.buffer.bytes
    该参数用来设置发送消息缓冲区的大小,其默认值为128KB;若设置为-1,则使用系统默认值。
  • receive.buffer.bytes
    该参数用来设置接收消息缓冲区的大小,其默认值为64KB;若设置为-1,则使用系统默认值。
  • request.timeout.ms
    该参数用来配置消费者等待请求响应的最长时间,其默认值为30s。
  • reconnect.backoff.ms
    该参数用来配置消费者每次尝试重新连接指定Broker之前应该等待的时间,其默认值为50s。
  • auto.offset.reset
    该参数有3个值,含义如下
    在这里插入图片描述
  • enable.auto.commit
    该参数用来配置是否开启自动提交消费位移的功能,默认为开启。
  • auto.commit.inerval.ms
    该参数只有当enable.auto.commit为true时才生效,即开启自动提交偏移量功能时自动提交消费位移的时间间隔,其默认值为5s。
  • partition.assignment.strategy
    该参数用来配置消费者的分区分配策略,支持轮询、范围策略。
  • inerceptor.class
    该参数用来配置消费者的拦截器,必须实现ConsumerInterceptor接口,使用消费者拦截器可以允许用户截取消费者接收的消息,从而改变消息内容。默认是没有设置拦截器。
  • exclude.internal.topics
    该参数用来配置Kafka中的内部主题是否可以向消费者公开,其默认值为true。在Kafka中,有两个内部主题:_consumer_offsets、_transaction_state。

期待大家来指导~

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐