kafka超全学习总结
四. Kafka快速入门选择Kafka原因 :Kafka吞吐量比较高 , 在我们的系统中, 文章是核心数据, 每天会发布很多文章数据, 产生很多用户行为因为我们的系统中会有一个实时行为计算的逻辑, 会使用KAFKA提供的实时流式计算功能因为我们团队中之前有小伙伴用过Kafka , 积累了一些使用经验, 所以我们再用学习成本和维护成本会比较低MQ使用场景 :系统异步调用 (解耦 ,故障隔离 , 削峰
四. Kafka快速入门
选择Kafka原因 :
- Kafka吞吐量比较高 , 在我们的系统中, 文章是核心数据, 每天会发布很多文章数据, 产生很多用户行为
- 因为我们的系统中会有一个实时行为计算的逻辑, 会使用KAFKA提供的实时流式计算功能
- 因为我们团队中之前有小伙伴用过Kafka , 积累了一些使用经验, 所以我们再用学习成本和维护成本会比较低
MQ使用场景 :
- 系统异步调用 (解耦 , 故障隔离 , 削峰填谷 , 提速 )
- 系统日志采集
4.1 kafka概述和安装
4.1.1 kafka概述
消息中间件对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 100万级 |
时效性 | ms | us | ms | ms级以内 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
功能特性 | 成熟的产品、较全的文档、各种协议支持好 | 并发能力强、性能好、延迟低 | MQ功能比较完善,扩展性佳 | 只支持主要的MQ功能,主要应用于大数据领域 |
消息中间件对比-选择建议
消息中间件 | 建议 |
---|---|
Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | 可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ |
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apache.org/
kafka介绍-名词解释
-
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
-
topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
kafka消息都是发送到主题中
-
consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
-
broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
4.1.2 kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
- Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
- Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
4.2 kafka快速入门
- 生产者发送消息,多个消费者只能有一个消费者接收到消息
- 生产者发送消息,多个消费者都可以接收到消息
4.2.1 创建项目
4.2.2 导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
4.2.3 发送消息
在kafka-producer项目中编写生产者代码发送消息 , 创建application.yml配置文件, 配置Kafka连接信息
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
编写代码发送消息
package com.heima.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;
import java.util.concurrent.ExecutionException;
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void testSend() throws ExecutionException, InterruptedException {
kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka !");
}
}
4.2.4 接收消息
在kafka-consumer项目中编写消费者代码接收消息 , 创建application.yml配置文件, 配置Kafka连接信息
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建监听器类, 监听kafka消息
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+"));
}
}
4.3 kafka高性能设计(面试)
跟RabbitMQ不同的是, Kafka中的消息都是基于磁盘存储
Kafka 是基于磁盘进行存储的,但 Kafka 官方又称其具有高性能、高吞吐、低延时的特点,其吞吐量动辄几十上百万。那么 Kafka 又是怎么做到其吞吐量动辄几十上百万的呢?
Kafka 高性能,是多方面协同的结果,包括宏观架构、分布式 partition 存储、ISR 数据同步、以及“无所不用其极”的高效利用磁盘、操作系统特性等。
总结一下其实就是五个要点
- 顺序读写
- 消息分区
- 页缓存
- 零拷贝
- 消息压缩
- 分批发送
4.3.1 顺序读写
磁盘读写有两种方式:顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平
为了提高读写硬盘的速度,Kafka就是使用顺序读写。规避了磁盘寻址 , 因此效率非常高。
Kafka底层数据存储
在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录,而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 “.index” 和 “.log” 分表表示 Segment 索引文件和数据文件
4.3.2 消息分区
Kafka 对于数据的读写是以分区
为粒度的,分区可以分布在多个主机(Broker)中,这样每个节点能够实现独立的数据写入和读取,并且能够通过增加新的节点来增加 Kafka 集群的吞吐量,通过分区部署在多个 Broker 来实现负载均衡
的效果
配置Kafka分区以及分区数量
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return TopicBuilder.name("kafka.topic2").partitions(2).build();
}
}
Kafka 的分区策略指的就是将生产者发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略
分区策略 | 说明 |
---|---|
轮询策略 | 按顺序轮流将每条数据分配到每个分区中 |
随机策略 | 每次都随机地将消息分配到每个分区 |
按键保存策略 | 生产者发送数据的时候,可以指定一个key,计算这个key的hashCode值,按照hashCode的值对不同消息进行存储 |
- 发送消息的时候, 可以手动指定分区编号 , 消息会发送到对应的分区
- 发送消息的时候, 传递了消息key值, 默认会根据key进行hash计算, 用hash值%分区数量, 得到的结果就是分区编号
- 发送消息的时候, 没有指定消息key值, 采用轮询分区策略 , 按顺序轮流将每条数据分配到每个分区中
4.3.3 页缓存
我们知道文件一般存放在硬盘(机械硬盘或固态硬盘)中,CPU 并不能直接访问硬盘中的数据,而是需要先将硬盘中的数据读入到内存中,然后才能被 CPU 访问
由于读写硬盘的速度比读写内存要慢很多(DDR4 内存读写速度是机械硬盘500倍,是固态硬盘的200倍),所以为了避免每次读写文件时,都需要对硬盘进行读写操作,Linux 内核使用 页缓存(Page Cache)
机制来对文件中的数据进行缓存
什么是页缓存 ?
为了提升对文件的读写效率,Linux 内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(称为 页缓存
)与文件中的数据块进行绑定
- 当从文件中读取数据时,如果要读取的数据所在的页缓存已经存在,那么就直接把页缓存的数据拷贝给用户即可。否则,内核首先会申请一个空闲的内存页(页缓存),然后从文件中读取数据到页缓存,并且把页缓存的数据拷贝给用户。
- 当向文件中写入数据时,如果要写入的数据所在的页缓存已经存在,那么直接把新数据写入到页缓存即可。否则,内核首先会申请一个空闲的内存页(页缓存),然后从文件中读取数据到页缓存,并且把新数据写入到页缓存中。对于被修改的页缓存,内核会定时把这些页缓存刷新到文件中
Kafka 中消息先被写入页缓存,由操作系统负责刷盘任务 , 这是 Kafka 实现高吞吐的重要因素之一
4.3.4 零拷贝
Kafka中存在大量的网络数据持久化到磁盘(Producer到Broker)和磁盘文件通过网络发送(Broker到Consumer)的过程。这一过程的性能直接影响到Kafka的整体性能。Kafka采用零拷贝这一通用技术解决该问题。
零拷贝技术可以减少数据拷贝和共享总线操作的次数,消除传输数据在存储器之间不必要的中间拷贝次数,减少用户应用程序地址空间和操作系统内核地址空间之间因为上下文切换而带来的开销,从而有效地提高数据传输效率 !
传统读取数据流程:
什么是 DMA 技术?简单理解就是,在进行 I/O 设备和内存的数据传输的时候,数据搬运的工作全部交给 DMA 控制器,而 CPU 不再参与任何与数据搬运相关的事情,这样 CPU 就可以去处理别的事务
整个复制期间共发生了 4 次用户态与内核态的上下文切换,因为发生了两次系统调用,一次是
read()
,一次是write()
,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态。上下文切换到成本并不小,一次切换需要耗时几十纳秒到几微秒,虽然时间看上去很短,但是在高并发的场景下,这类时间容易被累积和放大,从而影响系统的性能。
其次,还发生了 4 次数据拷贝,其中两次是 DMA 的拷贝,另外两次则是通过 CPU 拷贝的
- 第一次拷贝,把磁盘上的数据拷贝到操作系统内核的缓冲区里,这个拷贝的过程是通过 DMA 搬运的。
- 第二次拷贝,把内核缓冲区的数据拷贝到用户的缓冲区里,这个拷贝到过程是由 CPU 完成的。
- 第三次拷贝,把刚才拷贝到用户的缓冲区里的数据,再拷贝到内核的 socket 的缓冲区里,这个过程依然还是由 CPU 搬运的。
- 第四次拷贝,把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里,这个过程又是由 DMA 搬运的。
我们回过头看这个文件传输的过程,我们只是搬运一份数据,结果却搬运了 4 次,过多的数据拷贝无疑会消耗 CPU 资源,大大降低了系统性能
零拷贝流程
从 Linux 内核
2.4
版本开始起,对于支持网卡支持 SG-DMA 技术的情况下,sendfile()
系统调用的过程发生了点变化,具体过程如下:
- 第一步,通过 DMA 将磁盘上的数据拷贝到内核缓冲区里;
- 第二步,缓冲区描述符和数据长度传到 socket 缓冲区,这样网卡的 SG-DMA 控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到 socket 缓冲区中
零拷贝技术的文件传输方式相比传统文件传输的方式,减少了 2 次上下文切换和数据拷贝次数,只需要 2 次上下文切换和数据拷贝次数,就可以完成文件的传输,而且 2 次的数据拷贝过程,都不需要通过 CPU,2 次都是由 DMA 来搬运。
所以,总体来看,零拷贝技术可以把文件传输的性能提高至少一倍以上
什么是零拷贝 ?
- 传统方式下通过网络传输数据, 需要进过4次copy , 磁盘—> 操作系统内核缓存 ----> 进程缓冲区 —> socket缓冲区 —> 网卡----> 传输给其他用户
- 零拷贝是Linux2.4版本之后的技术, 是SG-DMA技术, 实现数据复制传输, 传输过程只用经过二次数据copy , 磁盘 ----> 操作系统内核缓存 ----> 通过SG-DMA组件搬运数据到网卡, 节省了2次数据拷贝带来的系统开销, 所以性能比较高
4.3.5 消息压缩
消息压缩顾名思义就是将消息内容压缩之后传输 , 可以有效节省带宽, 提高消息发送的效率
默认情况下, 消息发送时不会被压缩。我们可以在发送消息的时候配置压缩算法
代码中配置方式:
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
compression-type: gzip # 消息压缩算法
压缩算法 | 说明 |
---|---|
snappy | 占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
lz4 | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观 |
gzip | 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
4.3.6 分批发送
生产者发送多个消息到同一个分区的时候,为了减少网络带来的系能开销,kafka会对消息进行批量发送
batch.size : 通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10 # 重试次数
compression-type: gzip # 消息压缩算法
batch-size: 16KB #批量提交的数据大小
4.4 kafka高可用设计(面试)
4.4.1 集群架构
-
Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成
-
这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一
4.4.2 备份机制(Replication)
对分区进行复制
Kafka 中消息的备份又叫做 副本(Replica)
Kafka 定义了两类副本:
-
领导者副本(Leader Replica)
-
追随者副本(Follower Replica)
当领导者副本所在节点宕机之后, kafka会从最随着副本中选取一个副本升级为领导者副本, 对外提供数据读写服务
选取的是数据同步度最高的追随者副本
4.4.3 消费者组和再均衡
4.4.3.1 消费者组
消费者组(Consumer Group)
是由一个或多个消费者实例(Consumer Instance)组成的群组,具有可扩展性和可容错性的一种机制。消费者组内的消费者共享
一个消费者组ID,这个ID 也叫做 Group ID
,组内的消费者共同对一个主题进行订阅和消费,同一个组中只能够由一个消费者去消费某一个分区的数据,多余的消费者会闲置,派不上用场。
同一个组中只能够由一个消费者去消费某一个分区的数据
一个消费者可以消费多个分区
一个分区可以被不同组中的消费者进行消费
我们在上面提到了两种消费方式
-
一个消费者群组消费一个主题中的消息,这种消费模式又称为
点对点
的消费方式,点对点的消费方式又被称为消息队列 -
一个主题中的消息被多个消费者群组共同消费,这种消费模式又称为
发布-订阅
模式
点对点模式 : 多个消费者在同一个组中, 这样同一个组中只能有有个消费者消费同一个分区的数据就是点对点模式
发布-订阅模式 : 多个消费者处于不同的组 , 这样不同组中的消费者都能消费同一个分区的数据就是发布-订阅模式
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "kafka.topic.my-topic1",groupId = "group1")
public void listenTopic1group1(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:"+key + " : " + value);
}
@KafkaListener(topics = "kafka.topic.my-topic1",groupId = "group2")
public void listenTopic1group2(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group2中的消费者接收到消息:"+key + " : " + value);
}
}
如果想实现点对点消息(一个消息只能被一个消费者消费) , 让所有消费者在同一个组中即可
如果想实现发布订阅消息(一个消息可以被多个消费者消费) , 让消费者在不同的组中即可
4.4.3.2 再均衡
再均衡就是指 当消费者组中的消费者发生变更的时候(新增消费者, 消费者宕机) , 重新为消费者分配消费分区的过程
当消费者组中重新加入消费者 , 或者消费者组中有消费者宕机 , 这个时候Kafka会为消费者组中的消费者从新分配消费分区的过程就是再均衡
再均衡过程中,Kafka是不对外提供服务
重平衡(再均衡)非常重要,它为消费者群组带来了高可用性
和 伸缩性
,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为。在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用 , 并且在发送再均衡的时候有可能导致消息的丢失和重复消费
4.5 kafka生产者详解(面试)
4.5.1 发送类型
-
同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
@Test public void testSend() throws ExecutionException, InterruptedException { 同步发送 SendResult result = (SendResult) kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka").get(); System.out.println(result.getRecordMetadata().offset()); }
-
异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
@Test public void testSend() throws ExecutionException, InterruptedException { //异步发送 ListenableFuture future = kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka"); future.addCallback(result -> { //消息发送成功执行 SendResult sendResult = (SendResult) result; System.out.println(sendResult.getRecordMetadata().offset()); }, throwable -> { //消息发送失败执行 System.out.println("发送消息出现异常:" + throwable); }); Thread.sleep(1000); }
4.5.2 参数详解
Kafka消息可靠性 , 保证发送消息不丢失
- acks
代码的配置方式:
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10 # 重试次数
compression-type: gzip # 消息压缩算法
batch-size: 16KB #批量提交的数据大小
acks: all # 消息确认机制 0: 不签收 , 1 : leader签收 , all : leader和follower都签收
参数的选择说明
确认机制 | 说明 |
---|---|
acks=0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快 |
acks=1(默认值) | 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应 |
acks=all | 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 |
追求极致的吞吐量和性能使用 acks=0
追求是数据安全, 消息发送不丢失 , acks=all
既要吞吐量也要可靠性 : acks=1 (折中方案)
- retries
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
代码中配置方式:
spring:
application:
name: kafka-producer
kafka:
bootstrap-servers: 118.25.197.221:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10 # 重试次数
为了提高消息投递的成功率, 可以将重试次数设为一个很大的值 , 例如 : 99999999
4.6 kafka消费者详解(面试)
4.6.1 消费者组
-
消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
-
一个发布在Topic上消息被分发给此消费者组中的一个消费者
-
所有的消费者都在一个组中,那么这就变成了queue模型
-
所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型
-
4.6.2 消息有序性
应用场景:
-
即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
-
充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序
topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
4.6.3 提交和偏移量
kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用offset(偏移量)来追踪消息在分区的位置
消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡
正常的情况
如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费
再均衡后不可避免会出现一些问题
问题一:
如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
问题二:
如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
如果想要解决这些问题,还要知道目前kafka提交偏移量的方式:
提交偏移量的方式有两种,分别是自动提交偏移量和手动提交
- 自动提交偏移量
当enable.auto.commit
被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去
-
手动提交 ,当
enable.auto.commit
被设置为false可以有以下三种提交方式-
提交当前偏移量(同步提交)
-
异步提交
-
同步和异步组合提交
-
1.提交当前偏移量(同步提交)
把enable.auto.commit
设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
//同步提交偏移量
consumer.commitSync();
}
2.异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
//异步提交偏移量
consumer.commitAsync();
}
3.同步和异步组合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());
//同步异步, 结合提交
try {
consumer.commitAsync();
} catch (Exception e) {
e.printStackTrace();
}finally {
consumer.commitSync();
}
}
更多推荐
所有评论(0)