Kafka生产者原理与优化
Kafka生产者生产消息,基本上包括下面几个部分:线程:多个线程可以操作同一个kafkaProducer去生产消息,并且可以给多个不同的topic生产消息kafkaProducer: kafka消息生产者,具体对应kafka的一个java类,里面包含了partitioner分区器(根据具体消息的key去选择发送到哪个分区,如果没有key则轮询选择分区);accumulator累加器(针对缓冲区进行
目录
1.Kafka生产者原理:
1.基础流程:
Kafka生产者生产消息,基本上包括下面4个部分:
- 线程:多个线程可以操作同一个kafkaProducer去生产消息,并且可以给多个不同的topic生产消息。
- kafkaProducer: kafka消息生产者,具体对应kafka的一个java类,里面包含了partitioner分区器(根据具体消息的key去选择发送到哪个分区,如果没有key则轮询选择分区);accumulator累加器(针对缓冲区进行操作,缓存产生的消息,缓冲区大小可以根据业务调优-调优,默认是32M),还有双向队列dq用来缓存消息(缓冲区中topic->partition),dq中的又包含了batch,batch默认大小为16K,每个batch中可以存放一条或者多条消息,因为默认是16K,所以如果出现很多消息msg>16K的情况,就不能使用默认大小的已经开辟出内存的batch,需要重新进行系统调用开辟符合大小的batch内存,用完后还要回收,这样既可能造成内存碎片,还有频繁的系统调用,降低性能,所以需要根据msg大小对batch大小进行调整(调优),使得kafkaProducer开辟出符合业务需求的固定大小的batch。kafkaProducer 生产消息-> accumulator累加器(缓冲区)-> dq双向队列(t每个partition对应一个dq,并且对其操作是线程安全的) -> batch。
- Sender:这是一个IO thread,用来将缓冲区缓存的消息发送到broker集群。有一个kafkaProducer的参数定义IO thread发送数据的时间间隔,如果这个参数设置为0,那么缓冲区每写入一个消息,就发送一次,这样的话缓冲区的意义就不存在了,例如可以将这个参数的值设置为10秒,这样的话,每隔10秒,IO thread就会将缓存区缓存的消息发送到broker集群。Sender(IO thread)与kafkaProducer是一对一的关系, 在kafkaProducer的构造方法中会为其生成一个sender(IO thread),还有就是在执行newSender(...)方法的时候,会new Selector,最终调用到java.nio.channels.Selector.open(),使用到了NIO(但是不是使用Netty,是自己实现NIO),要把channel注册到selector。
- broker集群:就是kafka集群,每个broker表示一个kafka单例。
2.Sender相关:
conf.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1048576"),表示每次Sender(IO thread) 发送出去的数据的最大大小,这1M大小是由多个batch组成的。
(1).accumulator累加器(缓冲区)在累加的数据达到MAX_REQUEST_SIZE_CONFIG之后,就会唤醒IO thread进行发送???不是这样的,kafkaProducer会把消息以topic->partition->dq->batch的方式存放到缓冲区,IO thread每隔一段时间会以MAX_REQUEST_SIZE_CONFIG的大小从缓冲区拉取数据,发送到kafka集群。可以针对MAX_REQUEST_SIZE_CONFIG大小进行调优,但是要同时调整kafka集群接收端的大小。
IO thread在非阻塞的情况下每次发送最大的数据量是1M,但是最小的是多少呢?在阻塞的情况下,每发送一条消息到缓冲区,IO thread会将其发送到kafka集群。
(2).conf.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");表示连续发送5次,但是kafka没返回,那么kafkaProducer就不再发了,这里也可以进行调优。这里是什么情况?
(3). 这两个是Kafka自己设置的默认的TCP(socket)的缓冲区的大小。可以使用netstat - natp指令查看每个网络应用的tcp缓冲区大小。 conf.setProperty(ProducerConfig.SEND_BUFFER_CONFIG,"32768"); //32K -1 conf.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG,"32768"); //32k -1k
可以根据需要将其调大,进行调优。
其实操作系统有自己的TCP(socket) IO缓存的大小,所有的网络应用都要用到这个系统缓冲区,如果kafkaProducer需要发送大量数据,但是速度达不到预期,造成一些消息的积压,可以将上面的两个参数都设置为-1(调优),这样的话,就会使用操作系统默认的大小124928.
(4).综上所述,如果业务需要,可以通过设置缓冲区大小、batch大小(缓冲区相关,还有Sender相关的几个参数)来给kafkaProducer进行调优,但是这是有个前提的,那就是只有在ACK被设置为0的时候才会发挥缓冲器的作用,并且还要求用的是非阻塞的方式发送消息。ACK可以被设置为1,0,-1或者all,这4种方式代表不同的意义。
进入到 IO thread.
如果ACK设置为1或-1,那么往batch存一条消息,IO thread会立即将其发送给broker,这样缓冲区就不起作用了。
3.怎样保证消息的有序性:
有序的消息要保存到某个topic的用一个分区,为了保证这点,要将任务交给同一个线程,并且是同一个kafkaProducer,消息的key必须是相同的,kafkaProducer的分区器是根据key为消息选择分区的,如果没有key,那么消息将以轮询的方式分散发送给topic下的所有分区。
2.Kafka消费者:
需要注意的是:
- 每个KafkaConsumer都要指定一个group,这样的话这个consumer因为意外导致重启的时候,还能够继续从原来的offset进行消费;如果不指定具体的group名字,那么意外重启后,会属于一个随机的group,跟原来不一样,无法从原来的offset继续消费,因为offset记录在group中。虽然offset是属于某个partition的,但是根据不同业务的需要,也是按照group来存储的,多个业务消费同一个partition,就需要group,每个业务在不同的group中。
更多推荐
所有评论(0)