目录

1.Kafka生产者原理:

1.基础流程:

2.Sender相关:

3.怎样保证消息的有序性:

2.Kafka消费者:


1.Kafka生产者原理:

1.基础流程:

Kafka生产者生产消息,基本上包括下面4个部分:

  1. 线程:多个线程可以操作同一个kafkaProducer去生产消息,并且可以给多个不同的topic生产消息。
  2. kafkaProducer: kafka消息生产者,具体对应kafka的一个java类,里面包含了partitioner分区器(根据具体消息的key去选择发送到哪个分区,如果没有key则轮询选择分区);accumulator累加器(针对缓冲区进行操作,缓存产生的消息,缓冲区大小可以根据业务调优-调优,默认是32M),还有双向队列dq用来缓存消息(缓冲区中topic->partition),dq中的又包含了batch,batch默认大小为16K,每个batch中可以存放一条或者多条消息,因为默认是16K,所以如果出现很多消息msg>16K的情况,就不能使用默认大小的已经开辟出内存的batch,需要重新进行系统调用开辟符合大小的batch内存,用完后还要回收,这样既可能造成内存碎片,还有频繁的系统调用,降低性能,所以需要根据msg大小对batch大小进行调整(调优),使得kafkaProducer开辟出符合业务需求的固定大小的batch。kafkaProducer 生产消息-> accumulator累加器(缓冲区)-> dq双向队列(t每个partition对应一个dq,并且对其操作是线程安全的) -> batch。
  3. Sender:这是一个IO thread,用来将缓冲区缓存的消息发送到broker集群。有一个kafkaProducer的参数定义IO thread发送数据的时间间隔,如果这个参数设置为0,那么缓冲区每写入一个消息,就发送一次,这样的话缓冲区的意义就不存在了,例如可以将这个参数的值设置为10秒,这样的话,每隔10秒,IO thread就会将缓存区缓存的消息发送到broker集群。Sender(IO thread)与kafkaProducer是一对一的关系, 在kafkaProducer的构造方法中会为其生成一个sender(IO thread),还有就是在执行newSender(...)方法的时候,会new Selector,最终调用到java.nio.channels.Selector.open(),使用到了NIO(但是不是使用Netty,是自己实现NIO),要把channel注册到selector。
  4. broker集群:就是kafka集群,每个broker表示一个kafka单例。

2.Sender相关:

conf.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1048576"),表示每次Sender(IO thread) 发送出去的数据的最大大小,这1M大小是由多个batch组成的。

(1).accumulator累加器(缓冲区)在累加的数据达到MAX_REQUEST_SIZE_CONFIG之后,就会唤醒IO thread进行发送???不是这样的,kafkaProducer会把消息以topic->partition->dq->batch的方式存放到缓冲区,IO thread每隔一段时间会以MAX_REQUEST_SIZE_CONFIG的大小从缓冲区拉取数据,发送到kafka集群。可以针对MAX_REQUEST_SIZE_CONFIG大小进行调优,但是要同时调整kafka集群接收端的大小。

IO thread在非阻塞的情况下每次发送最大的数据量是1M,但是最小的是多少呢?在阻塞的情况下,每发送一条消息到缓冲区,IO thread会将其发送到kafka集群。

(2).conf.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");表示连续发送5次,但是kafka没返回,那么kafkaProducer就不再发了,这里也可以进行调优。这里是什么情况?

(3).
这两个是Kafka自己设置的默认的TCP(socket)的缓冲区的大小。可以使用netstat - natp指令查看每个网络应用的tcp缓冲区大小。
conf.setProperty(ProducerConfig.SEND_BUFFER_CONFIG,"32768");  //32K   -1
conf.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG,"32768"); //32k  -1k

 可以根据需要将其调大,进行调优

其实操作系统有自己的TCP(socket) IO缓存的大小,所有的网络应用都要用到这个系统缓冲区,如果kafkaProducer需要发送大量数据,但是速度达不到预期,造成一些消息的积压,可以将上面的两个参数都设置为-1(调优),这样的话,就会使用操作系统默认的大小124928.

(4).综上所述,如果业务需要,可以通过设置缓冲区大小、batch大小(缓冲区相关,还有Sender相关的几个参数)来给kafkaProducer进行调优,但是这是有个前提的,那就是只有在ACK被设置为0的时候才会发挥缓冲器的作用,并且还要求用的是非阻塞的方式发送消息。ACK可以被设置为1,0,-1或者all,这4种方式代表不同的意义

进入到 IO thread. 

如果ACK设置为1或-1,那么往batch存一条消息,IO thread会立即将其发送给broker,这样缓冲区就不起作用了。

3.怎样保证消息的有序性:

有序的消息要保存到某个topic的用一个分区,为了保证这点,要将任务交给同一个线程,并且是同一个kafkaProducer,消息的key必须是相同的,kafkaProducer的分区器是根据key为消息选择分区的,如果没有key,那么消息将以轮询的方式分散发送给topic下的所有分区。

2.Kafka消费者:

需要注意的是:

  1. 每个KafkaConsumer都要指定一个group,这样的话这个consumer因为意外导致重启的时候,还能够继续从原来的offset进行消费;如果不指定具体的group名字,那么意外重启后,会属于一个随机的group,跟原来不一样,无法从原来的offset继续消费,因为offset记录在group中。虽然offset是属于某个partition的,但是根据不同业务的需要,也是按照group来存储的,多个业务消费同一个partition,就需要group,每个业务在不同的group中
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐