acks(默认值为1)

        在消息被认为是“已提交”之前,producer需要leader确认请求的应答数。该参数用于控制消息的持久性,目前提供了3个取值:

        acks = 0: 表示producer请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。

        acks = -1:表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。

        acks = 1: 表示leader必须应答此请求并写入消息到本地日志则请求被认为成功。如果此时leader应答请求之后挂掉了,消息会丢失。折中的方案提供了不错的持久性保证和吞吐。

buffer.memory(默认值为33554432)

        该参数用于指定producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:33554432,合计为32M。kafka采用的是异步发送的消息架构,producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由一个专属线程负责从缓冲区读取消息进行真正的发送。消息持续发送过程中,当缓冲区被填满后,producer立即进入阻塞状态直到空闲内存被释放出来,这段时间不能超过max.blocks.ms设置的值,一旦超过,producer则会抛出TimeoutException 异常,因为Producer是线程安全的,若一直报TimeoutException,需要考虑调高buffer.memory了。用户在使用多个线程共享kafka producer时,很容易把 buffer.memory 打满。

max.block.ms(默认值为60000)

        KafkaProducer.send() and KafkaProducer.partitionsFor() 方法最大的阻塞时间,当buffer满了或者集群的Metadata不可用时,这两个方法会被阻塞。 

batch.size(默认值为16384)

        Producer都是按照batch进行发送的,因此batch大小的选择对于producer性能至关重要。producer会把发往同一分区的多条消息封装进一个batch中,当batch满了后,producer才会把消息发送出去。但是也不一定等到满了,这和另外一个参数linger.ms有关。batch.size的默认值为16384,合计为16K。如果producer发送的目标topic有10个分区,则需要16K*10=160K的内存来缓存这些数据,这个值不能大于buffer.memory指定的值。

linger.ms(默认值为0)

        Producer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。这种情况下,可能有的batch中没有包含足够多的produce请求就被发送出去了,造成了大量的小batch,给网络IO带来的极大的压力。如果数据的产生速度在时间T内能达到batch.size大小,则linger.ms设置的值不应该比T小,否则batch.size将失去意义。

compression.type(默认值为none)

        Kafka是端到端压缩,producer端开启了压缩,则在数据发往Broker的过程中是压缩的,Broker端的存储也是压缩的,Consumer从Broker端拉去的数据也是压缩的。这样,不仅减小了带宽,也减少了Broker端的磁盘占用。目前支持none(不压缩),gzip,snappy和lz4。建议使用gzip或者lz4。

retries(默认值为2147483647)

        Producer重试的次数设置。重试时,producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下producer会尝试重试。producer还有个参数:max.in.flight.requests.per.connection。如果设置该参数大于1,那么设置retries就有可能造成发送消息的乱序。版本为0.11.1.0的kafka已经支持"精确到一次的语义”,因此消息的重试不会造成消息的重复发送。

retry.backoff.ms(默认值为100)

        两次retry的时间间隔,可以根据是网络情况和服务器情况调整,正常情况下没必要调整。

max.in.flight.requests.per.connection(默认值为5)

        Producer的IO线程在单个Socket连接上能够发送未应答请求的最大数量。即客户端到服务端的网络上最多允许的请求数量。增加此值应该可以增加IO线程的吞吐量,从而整体上提升producer的性能。不过就像之前说的如果。开启了重试机制,那么设置该参数大于1的话有可能造成消息的乱序。默认值5是一个比较好的起始点,如果发现producer的瓶颈在IO线程,同时各个broker端负载不高,那么可以尝试适当增加该值.过大增加该参数会造成producer的整体内存负担,同时还可能造成不必要的锁竞争反而会降低TPS。

max.request.size(默认值为1048576)

    Producer单次发往某个Borker的请求最大值。Sender线程将属于某个Broker的多个ProducerBatch封装成一个ClientRequest,多个ProducerBatch大小之和不能超过max.request.size设置的值。max.request.size设置的值不应该比Broker端设置的message.max.bytes大。

enable.idempotence(默认值为false)

        是否使用幂等性。如果设置为true,表示producer将确保每一条消息都恰好有一份备份;如果设置为false,则表示producer因发送数据到broker失败重试使,可能往数据流中写入多分重试的消息。如果enable.idempotence为true,那么要求配置项max.in.flight.requests.per.connection的值必须小于或等于5;配置项retries的值必须大于0;acks配置项必须设置为all。如果这些值没有被用户明确地设置,那么系统将自动选择合适的值。如果设置的值不合适,那么会抛出ConfigException异常。

request.timeout.ms(默认值为30000)

        Producer向Broker发送请求以后,等待响应的最长时间。

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐