kafka消息队列基本概念与关键原理
Kafka 概述:Kafka是最初由Linkedin公司开发,用scala语言编写的,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web日志、消息服务等。它类似于JMS(Java Messga...
·
Kafka
概述:
Kafka是最初由Linkedin公司开发,用scala语言编写的,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。
它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web日志、消息服务等。
它类似于JMS(Java Messgae System),但是他不是JMS规范的实现。
JMS规范的消息系统中消息收发分类:
点对点/队列模式,一般基于拉取或轮询来接收消息,在这种模式中,可以有多个消费者同时在queue(队列)中侦听同一消息,但一条消息只会被一个消费者接收。既支持即发即弃的消息传送方式也支持同步请求/应答的消息传送方式。
发布/订阅模式,既可以支持推送来接收消息,也可以通过拉取或轮询的形式来接收。发布到一个topic(主题)的消息可以被多个订阅者接收,解耦能力更强。
Kafka的消息系统组成:
kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker(中介,拉皮条的)。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性,zookeeper保存kafka集群的meta(元数据)信息(broker/consumer)。
其他消息队列简单类比
RabbitMQ: Erlang编写,支持多协议AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式
Redis:基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差。
ZeroMQ:轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高
ActiveMQ:JMS实现,Peer-to-Peer,支持持久化、XA事务
Kafka/Jafka:高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
MetaQ/RocketMQ:纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务
消息队列适用场景和设计目标:
场景:
解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在
冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
扩展 消息系统是统一的数据接口,各系统可独立扩展
峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据
异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理
我大概比喻下Kafka最基本的消息队列使用场景
Kafka是生产者和消费者之间的中间件
在生活中比喻---
妈妈:生产者
你:消费者
馒头:数据流、消息
正常情况下: 生产一个 消费一个(效率低下)
其他情况:
1)一直生产,你吃到某一个馒头时,你噎死了(机器故障), 馒头就没人要了(数据丢失了)。
2)一直生产,做馒头速度快,你来不及吃完,馒头也就丢失了
为了防止上述“其他情况”的出现,我们可以拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃,而这篮子/框就可以为:Kafka。当篮子满了,馒头就装不下了,咋办? 多准备几个篮子 === Kafka的扩容。
设计目标:
高吞吐率:在廉价的商用机器上单机可支持每秒100万条消息的读写。
消息持久化:所有消息均被持久化到磁盘,无消息丢失,支持消息重放。
完全分布式:Producer,Broker,Consumer均支持水平扩展。
同时适应在线流处理和离线批处理。
Kafka的架构和关键概念:
架构:
分布式(负载均衡提高处理性能和容错能力)
Kafka以集群的方式运行,可以由一个或多个kafka服务(server)组成,每个服务叫做一个broker. 生产者通过网络将消息发送到Kafka集群,集群向消费者提供消息。
每个partition(分区)在Kafka集群的若干服务(server)中都有副本/备份,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。
每个分区有一个leader服务器,若干个followers服务器,leader负责处理消息的读和写,followers则去复制leader,如果leader down了,followers中的一台则会自动成为leader。
关键概念:
生产者Producer: 将向Kafka topic发布消息的程序成为producers.
消费者Consumer:将订阅topics并消费消息的程序成为consumer.
中介,拉皮条的broker:集群中的每个kafka服务实例。
Topic:
这是一个逻辑概念,可以理解为消息的主题或分类,每条消息都属于且只属于一个topic,生产者在发送消息时、消费者在接收消息时都必须指定消息的topic。一个topic中可以包含多个partition(分区)。
Partition:
这是一个物理概念,可以理解为一个partition物理上对应一个文件夹。一个partition只分布于一个broker上(不考虑备份的情况下),每个partition都是一个有序队列,分为多个大小相等的segment(对用户透明),每个segment对应一个文件,而segment文件由一条条不可变的记录组成(消息发出后就不可变更了),这里面的数据记录就是生产者发送的消息内容。
删除消息:
注意,消息记录只会被append到segment中,每条消息不会被单独删除或修改。当需要删除过期日志时,直接删除某一个segment。
删除可以通过配置定时删除,或者周期性检测记录文件大小来删除过大文件,比如:1.每两天定时删除每个分区中最老的segment。2.每12小时扫描一次,如果某个分区中的文件大小超过1G,则删除其中最老的一个segment。
文件组织和命名:
Partition在物理上一般以topic名+从0开始的有序序列命名,第一个partiton序号从0开始,序号最大值为partitions数量减1
每条消息有一个连续有序独享的序列号offset来标记消息位置,
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
概念范围:
kafka集群à单个brokeràtopicàpartitionàsegmentà单条消息记录
消息发布模式:
对于kafka来说,发布消息通常有两种模式:
队列模式(queuing)和发布-订阅模式(publish-subscribe)。
队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;
发布-订阅模式中消息被广播到所有的consumer,Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。
如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。
如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。
更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。
在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。
Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。
kafka操作相关
消息分区partitioner:可以通过写一个类实现partitioner接口来定义消息分区规则,(配置producer时会引用该类来使用规则)
举例
HashPartitioner哈希规则
RoundRobin轮询调度规则
Kafka消息发送方式:
同步Sync 异步Async 单次OneWay
kafka有同步(sync)、异步(async)以及oneway这三种发送方式,
同步和异步由配置参数producer.type指定,
Oneway由配置参数request.require.acks指定。
官方文档中的注释:producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。
异步模式还有四个配置参数:
queue.buffering.max.ms | 默认5000 | 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。 |
queue.buffering.max.messages | 默认10000 | 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。 |
queue.enqueue.timeout.ms | 默认-1 | 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。 |
batch.num.messages | 默认200 | 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量) |
producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset值或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks”,这个参数决定了producer要求leader partition收到确认的副本个数,如果acks设置为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。
若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。若设置为
-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
其中,在异步发送方式时将request.required.acks参数值设置为0,即为启用了oneway发送方式,简单来说就是只顾消息发送出去不管死活,不关心是否被成功接收,但是低延迟,高吞吐,适配于对可靠性基本没有要求的情景。
一般配置:
对于sync的发送方式:
producer.type=sync
request.required.acks=1
对于async的发送方式:
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
对于oneway的发送发送:
producer.type=async
request.required.acks=0
Producer相关:
Kafka有四个核心的API
1.ProducerAPI:允许一个应用向一个或多个topic里发布记录流;
2.ConsumerAPI:允许一个应用订阅一个或多个topics,处理topic里的数据流,就相当于消费;
3.StreamAPI:允许应用扮演流处理的作用,从一个或多个topic里消费数据流,然后产生输出流数据到其他一个或多个topic里,对输入流数据有效传输到输出口;
4.ConnectorAPI:允许运行和构建一个可重复利用的生产者和消费者,能将kafka的topic与其他存在的应用和数据库设备相连接,比如链接一个实时数据库,可以捕捉到每张表的变化。
这四个API,主要应用在IDEA上对应用程序的开发中,通过代码的形式管理Kafka。
此处提到producer和消息发送,顺便记录一下produce和consumerr在IDE中java代码层面的内容和相关配置。
代码:
首先可以写一个充当配置文件作用的接口,配置了Kafka的各种连接参数
也可以不写,在producer和consumer类里面手动配置。
:
-
public interface KafkaProperties
-
{
-
final static String zkConnect = "11.22.33.444:5555";
-
final static String groupId = "group1";
-
final static String topic = "topic1";
-
final static String kafkaServerURL = "11.22.33.444";
-
final static int kafkaServerPort = 5555;
-
final static int kafkaProducerBufferSize = 64 * 1024;
-
final static int connectionTimeOut = 20000;//等待时间ms
-
final static int reconnectInterval = 10000;//重连间隔ms
-
final static String topic2 = "topic2";
-
final static String topic3 = "topic3";
-
final static String clientId = "SimpleConsumerDemoClient";
-
}
然后是producer类的代码:
public class KafkaProducerSimple {
public static void main(String[] args) {
/**
* 1、指定当前kafka producer生产的数据的目的地
* 创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
* bin/kafka-topics.sh --create --zookeeper master:2181
* --replication-factor 1 --partitions 1 --topic orderMq
*/
String TOPIC = "orderMq8";
/**
* 2、读取配置文件
*/
Properties props = new Properties();
/*
* key.serializer.class默认为serializer.class
*/
props.put("serializer.class", "kafka.serializer.StringEncoder");
/*
* kafka broker对应的主机,格式为host1:port1,host2:port2
*/
props.put("metadata.broker.list", "master:9092,slaver1:9092,slaver2:9092");
/*
* request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
* 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
* 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
* 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
* 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
* 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
* -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
* 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
*/
props.put("request.required.acks", "1");
/*
* 可选配置,如果不配置,则使用默认的partitioner partitioner.class
* 默认值:kafka.producer.DefaultPartitioner
* 用来把消息分到各个partition中,默认行为是对key进行hash。
*/
props.put("partitioner.class", "com.bie.kafka.MyLogPartitioner");
//props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
/**
* 3、通过配置文件,创建生产者
*/
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
/**
* 4、通过for循环生产数据
注意,Message_ 是partitionKey用来配合partitioner分区规则来对消息进行分区
*/
for (int messageNo = 1; messageNo < 100000; messageNo++) {
String messageStr = new String("Messgae_"+ messageNo);
/**
* 5、调用producer的send方法发送数据
* 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
*/
producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + messageStr));
//producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "biexiansheng"));
}
}
}
下面是consumer类的代码:
public class KafkaConsumerSimple implements Runnable {
public String title;
public KafkaStream<byte[], byte[]> stream;
public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
this.title = title;
this.stream = stream;
}
@Override
public void run() {
System.out.println("开始运行 " + title);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
/**
* 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
* 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
* */
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> data = it.next();
Object topic = data.topic();
int partition = data.partition();
long offset = data.offset();
String msg = new String(data.message());
System.out.println(String.format(
"Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",
title, topic, partition, offset, msg));
}
System.out.println(String.format("Consumer: [%s] exiting ...", title));
}
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put("group.id", "biexiansheng");
props.put("zookeeper.connect", "master:2181,slaver1:2181,slaver2:2181");
props.put("auto.offset.reset", "largest");
props.put("auto.commit.interval.ms", "1000");
props.put("partition.assignment.strategy", "roundrobin");
ConsumerConfig config = new ConsumerConfig(props);
String topic1 = "orderMq8";
//String topic2 = "paymentMq";
//只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
//定义一个map
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topic1, 3);
//Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
//取出 `kafkaTest` 对应的 streams
List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
//创建一个容量为4的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
//创建20个consumer threads
for (int i = 0; i < streams.size(); i++) {
executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));
}
}
}
Kafka在SpringBoot框架中整合了之后开发流程如下:
配置统一写在application.yml文件中。
# kafka
spring:
kafka:
# kafka服务器地址(可以多个)
bootstrap-servers:
consumer:
# 指定一个默认的组名
group-id: kafka2
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org
.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org
.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org
.apache.kafka.common.serialization.StringSerializer
value-serializer: org
.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size:
# 缓存容量
buffer-memory:
# 服务器地址
bootstrap-servers:
#配置项还有很多可以按需配置
或者将配置文件写在application.properties里(springboot1.5)
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=
#=============== producer =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
然后在 Spring Boot 中就可以使用 KafkaTemplate 发送消息,使用 @KafkaListener 监听消费指定主题的消息,在监听中设置监听的 topic ,topics 是一个数组所以是可以绑定多个主题的,可以同时监听多个 topic 的消息。。简单演示代码如下
例一:
@Controller
@EnableAutoConfiguration
public class SampleController {
public static Logger logger = LoggerFactory.getLogger(SampleController.class);
@Autowired
private KafkaTemplate<String, String> template;
@RequestMapping("/send")
@ResponseBody
String send(String topic, String key, String data) {
template.send(topic, key, data);
return "success";
}
public static void main(String[] args) throws Exception {
SpringApplication.run(SampleController.class, args);
}
@KafkaListener(id = "t1", topics = "t1")
public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());
}
@KafkaListener(id = "t2", topics = "t2")
public void listenT2(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());
}
}
例二:
Producer:
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;//kafkaTemplate相当于生产者
@RequestMapping(value = "/{topic}/send",method = RequestMethod.GET)
public void sendMeessage(
@RequestParam(value = "message",defaultValue = "hello world") String message,
@PathVariable final String topic) {
logger.info("start sned message to {}",topic);
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic,message);//发送消息,topic不存在将自动创建新的topic
listenableFuture.addCallback(//添加成功发送消息的回调和失败的回调
result -> logger.info("send message to {} success",topic),
ex -> logger.info("send message to {} failure,error message:{}",topic,ex.getMessage()));
}
@RequestMapping(value = "/default/send",method = RequestMethod.GET)
public void sendMeessagedefault() {//发送消息到默认的topic
logger.info("start send message to default topic");
kafkaTemplate.sendDefault("你好,世界");
}
Consumer:
//监听器必须实现MessageListener这个接口中onMessage方法
public class MyMessageListener implements MessageListener<String, String> {
public final static Logger logger = LoggerFactory.getLogger(MyMessageListener.class);
@Override//此方法处理消息
public void onMessage(ConsumerRecord<String, String> data) {
String topic = data.topic();//消费的topic
logger.info("-------------recieve message from {} topic-------------", topic);
logger.info("partition:{}", String.valueOf(data.partition()));//消费的topic的分区
logger.info("offset:{}", String.valueOf(data.offset()));//消费者的位置
logger.info("get message from {} topic : {}", topic, data.value());//接收到的消息
}
}
KafkaTemplate.send():
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
topic:主题名称;partition:要发送消息到哪个分区;timestamp:创建消息的时间;key:消息的键;value:消息的值。
Kafka的数据复制与Failover
Replica(副本):
当某个topic的replication-factor值为N且N>1时,该topic中的每个partition都会有N个副本(replica)。
副本数应小于broker数量,即对每个partition而言每个broker上最多有一个replica,(partition的所有副本会均匀分布到所有broker上)----为了保证可用性
Data replication要解决的问题:
-
如何propagate(消息传递):
Producer发送消息和consumer接收消息都是通过leader partition(主分区)来进行的,而follower partition(从分区,是leader的一个replica)会周期性的拉取主区中存储的消息数据,从而达到备份的作用,当leader宕机,follower会自动选出新的leader继续提供服务。
-
何时commit:
Commit是指leader告诉producer,你所发送的消息数据被我成功接收了。Kafka希望保证数据能够及时被follower拉取,从而保持数据不丢失和强可用性,因此何时commit很关键。Commit的方式有同步和异步两种,同步是指producer发出消息被leader接收后,待所有follower都拉取到数据后才向producer返回commit确认。异步是指只要leader拿到数据了,就返回commit。
Kafka在默认情况下用的是既不完全同步也不完全异步的方式,称为ISR方式(in-sync replica):
Leader会负责动态维护一个与其基本保持一致的replica列表,该列表称为ISR,如果列表中有一个follower比leader落后太多,或者超过一定时间没发起拉取数据请求,则leader会把它从ISR移除。当ISR中所有follower(replica)都向leader发送ack时,leader返回commit。
当leader检测到某个follower与自身基本一致且不在ISR中,会将其加入到ISR。
Consumer只能读取到被commit的数据
Commit策略:
Server配置
replica.lag.time.max.ms=10000//最大不拉取时间
replica.lag.max.messages=4000//最大一致性差距
Topic配置
min.insync.replicas=1//ISR中最少的副本数
Producer配置
request.required.acks=0
//默认值0,异步的,不需要leader给producer返回ACK.
//为了ISR的发挥一般设置为-1. 若设置为
-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
3.如何处理replica恢复
Leader只会commit ISR列表中所有partition都获取到了的消息,leader宕机后,从ISR列表中选举出新的leader,新leader继续按照与之前一样的原则维护ISR,当宕机的partition重启后,他会将之前commit的数据恢复并通过拉取数据来追赶leader,直到重新加入ISR。
在上图的情况中,leaderA在commit M3之前就宕机了,M3没有被ISR列表中的replica 从机拉取到,于是M3就丢失了,这是kafka可能会造成数据丢失的情形。但是M3从来没有被leader commit过,因此producer并不会认为自己发送的M3被成功接收了,它会retry重新发送M3,直到达到retry次数上限。若达到上限M3还没有被commit,那M3可能就真的丢失了。
4.如何处理replica全部宕机:
Kafka提供用户两种选择,可以通过配置来改变,默认第二种
等待ISR中任一Replica恢复,并选它为Leader
等待时间较长,降低可用性
或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用
选择第一个恢复的Replica为新的Leader,无论它是否在ISR中(默认配置)
并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失
可用性较高
更多推荐
已为社区贡献2条内容
所有评论(0)