注:此文并非官方文档的翻译

kafka的producer默认是异步的方式,在大数据量的情况下可能会出现丢失数据的情况.但是同步的方式又比较低效,因此合理设置异步producer下的kafka参数既可以提高效率又可以不丢失数据.只是要对各参数有一个比较深入的了解.下面是我总结的对于处理安全外几乎所有producer参数的理解:

以python客户端为例子,java的参数名可能稍有不同但是含义是一样的

例子:
producer = KafkaProducer(bootstrap_servers='xxx1:9092,xxx2:9092',
        acks=1,retries =3,
        batch_size=524288,
        reconnect_backoff_max_ms=3000,
        buffer_memory=536870912
        )
  
// producer默认是异步的
f = producer.send("topic_name","hello")

//f.get(timeout=3) 如果加了get就变成了同步,也就是说要等待get到服务端返回的结果后再往下执行

bootstrap_servers

格式为host[:port]例如localhost:9092,是kafka连接的broker地址列表,可以是多台,用逗号分隔

client_id (str)

客户端名称,用来追查日志的,默认是kafka-python-producer-# (#是个唯一编号)

key_serializer (callable)

key序列化函数. 默认值: None.

value_serializer (callable)

值序列化函数默认值: None.

acks (0, 1, 'all')

代表kafka收到消息的答复数,0就是不要答复,爱收到没收到.1就是有一个leader broker答复就行,all是所有broker都要收到才行

  • 0: Producer不等待kafka服务器的答复,消息立刻发往socket buffer,这种方式不能保证kafka收到消息,设置成这个值的时候retries参数就失效了,因为producer不知道kafka收没收到消息,所以所谓的重试就没有意义了,发送返回值的offset全默认是-1.

  • 1: 等待leader记录数据到broker本地log即可.不等待leader同步到其他followers,那么假如此时刚好leader收到消息并答复后,leader突然挂了,其他fowller还没来得及复制消息呢,那么这条消息就会丢失了.

  • all:等待所有broker记录消息.保证消息不会丢失(只要从节点没全挂),这种方式是最高可用的 acks默认值是1.

compression_type (str)

发消息时候的压缩类型可以是gzip,snappy,lz4,None,压缩是针对batches的,所以batches的大小会影响压缩效率,大一点的压缩比例可能好些,要是太小的话压缩就没有意义了,比如你就发个几个字节的数据那压完没准更大了.至于什么时候启用压缩,要看应用场景,启用后producer会变慢,但网络传输带宽占用会减少,带宽紧缺建议开启压缩,带宽充足就不用开了 默认值:None

retries (int)

重试发送次数,有时候网络出现短暂的问题的时候,会自动重发消息,前面提到了这个值是需要在acks=1或all时候才有效.如果设置了该参数,但是setting max_in_flight_requests_per_connection没有设置为1的话,可能造成消息顺序的改变,因为如果2个batches发到同一个partition,但是第一个失败重发了,那么就会造成第二个batches跑到前面去了. Default: 0.

batch_size (int)

批处理消息字节数,发往broker的消息会包含多个batches,每个分区对应一个batch,batch小了会减小响吞吐量,batch为0的话就禁用了batch发送,默认值:16384(16kb)

linger_ms (int)

逗留时间,这个逗留指的是消息不立即发送,而是逗留这个时间后一块发送,这个设置是比较有用的,有时候消息产生的要比能够发送的要快,这个参数完美的实现了一个人工的延迟,使得大批量可以聚合到一个batch里一块发送.当batch慢了的话,会忽略这个参数立即发送,这个参数有点类似TCP协议中的Nagle算法. Default: 0.

partitioner (callable)

分区函数,用来人工干预消息发到到哪个分区,这个函数会在key serialization后调用,函数原型:partitioner(key_bytes, all_partitions, available_partitions), 默认是对key做murmur2算法的hash(跟java客户端算法相同),hash值相同的到一个分区,没有key的话就是随机分区.

buffer_memory (int)

当消息发送速度大于kafka服务器接收的速度,producer会阻塞max_block_ms,超时会报异常,buffer_memory用来保存等待发送的消息,默认33554432(32MB)

max_block_ms (int)

当buffer满了或者metadata获取不到(比如leader挂了),或者序列化没完成分区函数没计算完等等情况下的最大阻塞时间,默认60000ms (60秒)

max_request_size (int)

消息的最大大小限制,也就是说send的消息大小不能超过这个限制,这个限制只改一个地方是不行的producer, broker, consumer都要改才行. Default: 1048576.(1MB)

metadata_max_age_ms (int)

metadata的刷新时间,每经过这个时间就刷新下metadata来发现新的分区或broker,不管有没有都会刷新下看看 Default: 300000(5分钟)

retry_backoff_ms (int)

– 重试发送如果还是错误要等待下次重试的时间,单位毫秒 Default: 100.

request_timeout_ms (int)

发送请求的超时时间 Default: 30000.(30秒)

receive_buffer_bytes (int)

TCP receiver buffer(SO_RCVBUF)大小,就是接收数据的缓冲区大小 默认:None(根据操作系统设置).Java Client默认是32768

send_buffer_bytes (int)

TCP send buffer (SO_SNDBUF) 发送数据的缓冲区大小 默认:None(根据操作系统设置).Java Client默认是131072.

socket_options (list)

socket选项 默认: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]

reconnect_backoff_ms (int)

尝试重新连接broker的时间间隔 Default: 50.

reconnect_backoff_max_ms (int)

reconnect_backoff_ms每次再次连接失败会以指数增长,增长到的最大限度就是这个参数,为了避免连接风暴,连接重试的时间间隔会在一个范围内随机调整,上浮或下调20%,也就是说每次重连的时间间隔不一定就是这个值本身,而是上下浮动20%. 另外这里解释下连接风暴,当我们的kafka集群出现问题后,所有的producer和consumer都会尝试重连,重连的间隔就会达到这个参数所设置的最大值,比如大家都是每秒尝试重连,这时候如果集群回复了,那么在同一秒可能就会有大量的连接打到kafka集群上,这就造成了连接风暴,但是如果随机上下浮动就可能把重连时间给错开,不会造成同事的大量连接 Default: 1000.

max_in_flight_requests_per_connection (int)

发送多少条消息后,接收服务端确认,比如设置为1,就是每发一条就要确认一条,设置为5就是,发送5条消息等待一次确认 ,如果大于1,比如5,这种情况下是会有一个顺序问题的,就是这5条消息其中的一条发送失败了,如果进行重试,那么重发的消息其实是在下个批次的,这就会造成消息顺序的错乱,所以如果需要保证消息的顺序,建议设置此参数为1 Default: 5.

转载于:https://my.oschina.net/u/218540/blog/1794669

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐