Kafka(二)— 原理解析
Kafka(二)— 原理解析
kafka(二)— 图解部分原理
1.生产者原理
1.首先创建main线程,创建Producer对象,调用send方法发送数据,遇到拦截器就处理数据,然后将数据序列化(kafka本身的序列化器,开发要手动指定),然后进入分区器,根据分区数进行分区
2.然后将分好区的数据发往一个位于内存的缓冲区(默认32M),每个分区对应一个双端队列,生产者发送数据的批次大小为16K,这里实际上是这样的,32M的内存被分成N个内存块,放到一个内存池里,创建一个Batch,就从缓冲池里取一个16K的内存块就好了,如果该数据到达了Kafka,那么将该内存块还回去就好了,这样避免了频繁的垃圾回收,不会出现频繁的STW现象
3.sender线程的Sender读取数据,读取的临界条件是每个队列内的数据大小总和达到batch.size(默认16K)或者sender等待时间达到了linger.ms(默认0ms)
4.每个队列的数据被“转换”成请求,放入请求队列,去请求集群上对应节点的对应分区,要通过底层创建的Selector传送到集群,分区应答机制分为三种—①应答机制为0:生产者只需要发送数据,其余啥都不用管 ②应答机制为1 需要等待leader收到数据后才返回应答 ③ -1(all) leader和follower都收到数据后才应答
。若节点不应答,最多可累计5条请求
5.发送成功到Kafka集群,那么清除内存中对应的数据,不成功重试(重试次数可达int的最大值次)
2.主题分区
主题为什么分区
很明显,将数据分区,可以合理的使用存储资源
其次,可以提高并发写性能,生产者可以以分区为单位进行发送数据。同时消费者可以分区为单位进行消费数据,提高数据处理能力,提高了读性能
主题分区策略
假如我只有3台服务器,但我对某主题分了9个分区,每个分区2个副本,那我这9个分区的leader在哪?follower在哪? 注意,副本数不能超过机器数(超过会报错)
测试
./kafka-topics.sh --bootstrap-server hadoop102:9092 --topic topicx --create --partitions 9 --replication-factor 2
./kafka-topics.sh --bootstrap-server hadoop102:9092 --topic topicx --describe
可以看到,leader
采取了轮询的方式,实现了leader自动平衡
。关于follower,以每个节点为单位看,我们可以看到,位于节点0的leader的副本策略是这样的,[0,1] [0,2] [0,1],如果再来一个leader位于0,那它的副本策略是[0,2],也是为了负载均衡
生产经验 修改分区副本的位置
假如我想把分区0的副本由[1,2]变为[0,1]
在kafka目录下vim increase-replication-factor.json
{
"version":1,
"partitions":[{"topic":"topicx","partition":0,"replicas":[0,1]}
}
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
此外,还可以通过种方式增加副本,假如我把上述json文件再修改一下,改成[0,1,2],那该分区的副本数就会增加!
数据分区策略(数据被分到哪个分区规则)
可以看到,生产者的构造函数有以下6种
对于前4种,我们可以通过第二个参数直接指定分区
对于第5种,虽然没有指定分区,但是指定了key,那么数据在哪个分区由 key的hash值与topic的partition数进行取余操作得到分区号,比如我们可以将一张mysql的表名字作为key,那么这张表的数据就会进入一个分区了
对于第6种,既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)
自定义数据分区器
实现Partitioner接口,重写方法
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 发送的数据 value
String str = value.toString();
int partition;
if(str.length() <= 3){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在属性中指定分区类,发送消息就会根据分区器进行发送了
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gzhu.MyPartitioner");
如何合理设置分区数
首先根据生产经验来确定初始值,然后根据每个分区的生产者吞吐量和消费者吞吐量进行进一步确定
分区数设置过多,有什么坏处?
首先我们要清楚,分区数越多,理论上来说整个集群的吞吐量越大,但是分区数多了弊端也多
1.内存方面
- 生产者发送数据,内存中的每一个双端队列对饮一个分区,分区多了,那么这部分占用的缓存也会多
- 分区数多了,那么消费者为了消费及时,消费者的数量也会增多
2.磁盘方面
数据是以分区为单位存在磁盘中的,分区多了,那么占用的空间也会增多
3.端到端延迟方面
端到端延迟:消费者接收消息的时间减去生产者发送消息的时间
我们知道,kafka只有在消息提交之后,才会将消息暴露给消费者,分区越多,那么副本之间同步的数据就会越多,因此,ISR副本集合间的赋值数据所花的时间是端到端延迟的主要部分
3.生产者吞吐量
主要有4种方式改变吞吐量
1.改变RecordAccumulator的大小,缓冲区大了,数据处理能力也强了
2.改变batch.size的大小,这个要根据实际需求改变,如果太小导致发送数据太频繁,太大导致数据延迟高
3.改变linger.ms的时间,如果太小导致发送数据太频繁,太大导致数据延迟高
4.修改压缩方式
// 更改缓冲区大小 默认33554432 = 32M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小 默认16K 16384
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms 1ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
// 压缩类型 snappy用多一些
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
4.数据可靠性 ISR原理 副本机制
每个队列的数据被转换成请求队列,去请求集群上对应节点的对应分区,要通过底层创建的Selector传送到集群,集群应答机制分为三种
①应答机制为0 生产者只需要发送数据,其余啥都不用管 ,很明显,如果leader没收到,就丢失了,但是生产者认为发送成功了,因此这种机制很不可靠,基本不用
②应答机制为1 需要等待leader收到数据后才返回应答 ,也有缺陷,leader收到某数据并且应答了,但是follower还没来得及同步,此时leader挂了,数据没有备份且唯一收到数据的leader还挂了,此时生产者已经收到过应答了,认为已经发送成功了,数据就丢失了
③ -1(all) leader和follower都收到数据后才应答 看似十分可靠,但是有个问题,比如某follower同步的时候,由于故障挂掉了,此时这个follower就无法产生应答了,生产者却一致在等,那整个集群不就瘫痪了吗?
为此,Leader维护了一个动态的In-Sync Replicas(ISR),意为和Leader保持正常同步的Follower+Leader集合 (leader:0,isr:0,1,2),因此我们只要关注ISR里节点的应答就可以
如果Follower在30s内未向Leader发送通信请求或同步数据,则该Follower将被移出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s,即只要一个 Follower 副本落后Leader 副本的时间不连续超过 30 秒,Kafka 就认为该 Follower 副本与 Leader 是同步的,即使 Follower 副本中保存的消息明显少于 Leader 副本中的消息。例如节点2的副本超时,则(leader:0, isr:0,1)
这样就不用等长期联系不上或者已经故障的节点
如果分区副本设置为1个,或者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。因此,想要保证数据完全可靠,ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2,回到当初的问题,[某follower同步的时候,由于故障挂掉了,此时这个follower就无法产生应答了,生产者却一致在等那整个集群不就瘫痪了吗?],由于ISR机制,这个follower被踢出,生产者等到了另外两个的应答,也视为成功
副本机制
上面的描述,我们已经知道什么是ISR,实际上,kafka的副本机制完整的是这样的
图来自公众号 - 华仔聊技术
1)AR 副本集合: 分区 Partition 中的所有 Replica 组成 AR 副本集合。
2)ISR 副本集合: 所有与 Leader 副本能保持一定程度同步的 Replica 组成 ISR 副本集合, 其中也包括 Leader 副本
3)OSR 副本集合: 与 Leader 副本同步滞后过多的 Replica 组成 OSR 副本集合
机制选择
在生产环境中,acks=0很少使用
acks=1,一般用于传输普通日志,允许丢个别数据
acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
在开发中,主要配置两个参数
// 应答机制,默认为all
properties.put(ProducerConfig.ACKS_CONFIG,"all");
// 重试次数,默认int的最大值次
properties.put(ProducerConfig.RETRIES_CONFIG,1000);
5.数据重复问题 幂等性原理
先明白一点,我们目前针对的某个分区来分析的
假如对于某个分区,一个leader和两个follower,都成功收到了数据,但是就在返回应答的时候,leader挂了,那么生产者收不到全部的应答(应答机制为all),由于ISR机制,生产者会重新发送数据,此时原来的两个follower(有个follower在leader挂了后会成为新的leader)已经有了数据,再接收一次就产生了分区出现了数据重复问题
关于发送次数的一些场景 — kafka消息语义
1.若至少发送一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
2.若发送最多一次(At Most Once)= ACK级别设置为0
At Least Once可以保证数据不丢失,但是不能保证数据不重复
At Most Once可以保证数据不重复,但是不能保证数据不丢失(实际我们不用)
3.精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失,我们怎么实现呢???幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端某分区都只会持久化一条,保证了不重复
幂等性主要通过三个属性来实现:具有<PID, Partition, SeqNumber>相同主键
的消息提交时,Broker某分区只会持久化一条
其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的,是每条数据的序列化号,不同数据是不同的
很明显,幂等性只能保证的是在单会话单分区内不重复,为什么只能保证单会话?示例,对于一个消息a,kafka集群启动了两次,由于PID不同,分区认为这是不同的数据,不会认为是相等的数据(实际都是数据a)
再回到当初的情景,节点0挂了,生产者重新发送数据,新的leader再次接收到a时,由于PID没变,分区号没变,SeqNumber没变,重复了,不接受,成功解决分区内的数据重复问题!
开发代码中如何实现幂等性?
开启参数enable.idempotence默认为true,false 关闭,实际上,底层幂等性是默认开启的,我们不写也可以
6.Kafka事务原理
幂等性只能保证的是在单分区单会话内不重复,幂等性不能跨多个分区。多个分区之间不会重复数据怎么实现,怎么办? 事务,kafka事务可以保证对多个分区写入操作的原子性
开启事务,必须开启幂等性,事务底层依赖幂等性
首先确定使用哪个事务协调器(因为每个节点都有事务协调器,啥?啥是事务协调器?kafka集群和客户端进行事务通信总得有个东西吧?就是通过它!),如何确定?集群中有一个特殊的主题,用来存储事务的信息,默认有50个分区,每个分区都会负责一部分事务。如何确定事务属于50个中的哪个分区?需要程序员手动输入一个transactional.id(全局唯一),将transactional.id的hashcode%50计算出属于哪个分区,该分区leader所在的节点上那个事务控制器就是我们要使用的
我们假设我们的主题是3个分区,且使用broker0的事务控制器
1.因为事务依赖于幂等性,保证单个分区内的数据不重复,所以生产者首先要向事务协调器获取Producer ID,也即PID
2.事务协调器返回当前的PID
3.开启事务后生产者向集群的不同分区发送数据,是原子性操作
4.告诉事务协调器事务提交commit请求
5.事务协调器通知事务主题该请求,让其进行持久化操作,这样保证了客户端挂了后,重启后事务可以继续处理未完成的数据
6.事务协调器告知生产者事务提交成功
7.事务协调器会发送请求到每个分区,确认每个分区是否收到了数据,确保事务成功
8.每个分区应答给事务协调器,告知一切正常
9.事务协调器收到通知后,将该条事务的执行成功持久化,确保不会再执行该事务了
IDEA 事务
public class TransactionsTest {
public static void main(String[] args) {
// 1.配置属性
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092");
// 指定对应的key和value序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 指定事务id 任意取,但唯一
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"100");
// 2.创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3.初始化并开启事务
producer.initTransactions();
producer.beginTransaction();
try {
// 4.发送数据
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("first","Hello " + i));
}
// 5.提交事务
producer.commitTransaction();
}catch (Exception e){
// 有异常终止事务
producer.abortTransaction();
}finally {
// 6.关闭生产者
producer.close();
}
}
}
7.数据有序原理
如果保证分区内有序,是有条件的,这里的有序是指生产者发送 1 2 3,集群收到的数据为1 2 3,而不是2 1 3 等等
什么条件呢?
没有开启幂等性,数据没有序列化号(序列化号大的是后来的,因为序列化号是递增的)!不知道谁先来的,谁后来的,因此为了保证有序,直接设置下图中请求只能存一个,max.in.flight.requests.per.connection需要设置为1,当前请求成功后才发送下一个请求
不开启幂等性并且max.in.flight.requests.per.connection不为1,假设当有两个请求时,请求1先来的,但是请求失败了,请求2成功了,这样请求1再次请求成功后,1在2的后面了,因此只能max.in.flight.requests.per.connection只能为1
开启了幂等性,最大的好处是有序列化号了,可以根据序列化号判断先后了!
max.in.flight.requests.per.connection需要设置小于等于5,这样我就能保证最近5个请求时有序的了
开启幂等性,假设当有两个请求时,请求1先来的,但是请求失败了,请求2成功了,但是集群知道第一个应该是请求1,先在内存放着请求2,等待请求1,这样请求1再次请求成功后,先让1排到前面去,再落盘,保证了有序
注意,只保证了分区内有序,如果想实现分区间有序,只能所有分区数据到了集群排序了
8.ZooKeeper一些信息
1.kafka/brokers/ids 可以看到哪些节点正常运行
2.kafka/brokers/topics可以看到每个主题的每个分区的leader和ISR
3.kafka/controller是个辅助选举leader的
controller作用
- 创建、删除主题,增加分区并分配leader分区,即所有主题信息
- 集群Broker管理(新增 Broker、Broker 主动关闭、Broker 故障),即所有broker信息
- preferred leader选举
- 分区重分配
两者结合使用原理
每个kafka的节点启动后会向/brokers/ids/ 注册,然后每个节点都会有controller控制器,采取抢占式,哪个节点的controller抢到了,谁就说了算,然后由该controller监控ids并进行每个分区的leader选举,选举规则:
在ISR存活为前提,谁在AR(Assigned Replicas) 中排在前面谁就是leader(AR:分区所有副本统称),对于分区1,启动的时候AR假如是是这样的[1,0,2],那么就会选举broker1的作为leader,然后controller会将分区信息上传到zookeeper,其余的controller会同步信息(保证容错性),注意,是AR副本中,谁在前面谁是leader,ISR中存活为前提,那为什么不是直接从ISR里面选举呢?因为如果ISR没有存活副本,是可以从OSR里面选举的,需要开启unclean.leader.election.enable参数,不过这个参数默认是false
假如leader挂了或者其他原因导致leader重新选举,controller监听到了,那么controller会立马拉取zookeeper上的AR信息,根据选举规则再次选举,找活着的,并且在AR拍前面的就是新的leader
假如Controller所在节点发生故障,kafka可以感知到,Controller节点故障恢复会自动的进行,重新选举
数据在分区是怎么存储的
实际是以主题名字加分区号命名的
每个分区都会有个log文件,log是有多个Segment组成,生产者到topic的数据就被存在Segment
以Segment(1G)为单位存储,为了加快检索速度,会有个.index文件,使用索引加快查询速度,这里是稀疏索引
,当往log文件写入4kb数据时,index才会增加一条索引,也就是这4kb共用一个索引,找数据的时候先找到数据时哪个索引管,然后根据索引信息去log中找
数据清除
一.删除
每个segment最后来的数据的时间戳,超过7天删除(这个时间可以修改),也就是最后来的数据过期,才会清除,最后来的都过期了,其余的肯定过期了
二.压缩
适合K-V类型数据,对于相同的key,value只保留最新的value
9.leader和follower同步问题
LEO(Log End Offset) : 每个副本下一条待写入消息的offset,看图
HW(High Watermark):所有副本最小的LEO,上图中,Follower1的LEO最小,那么所有副本的HW为4号位,很明显,消费者只能消费HW(不包括)之前的数据
leader会比较所有ISR副本中的HW,取最小的当做该分区的HW,follower会拉取leader记录的HW和自己的比较,以leader的为准!
Follower出现故障宕机
假如上图中,Follower2出现故障,ISR首先会将节点删除,但是Follower1和Leader会继续接收数据
Follower2恢复后,会同步leader的HW,并将文件中高于等于HW的部分删掉,因为它觉得这些数据没校验,然后开始同步Leader的数据,直到该Follower的LEO大于等于该分区的HW,才算真正的恢复,加入到ISR
Leader出现故障
Leader出现故障后,根据选举规则会选择一个新的leader,那么此时新leader的LEO和HW成为新的标准,所有的follower的数据都要向新的leader看齐,也即follower多余的数据的数据会被砍掉
很明显,如果leader出现故障会导致很多问题的发生
原文参考 https://blog.csdn.net/m0_60992470/article/details/120102171
问题一 数据丢失问题
follower 副本A因为某种原因重启(重启后读取的HW是本机,宕机恢复后读取的是leader的HW)后,要把高于等于HW的数据删除掉(kafka本身这样设计的),也就是此时副本B只有数据0,然后follower 副本A会和leader B同步,但就在此时,B宕机,那么A将会被选举为Leader,B恢复后,此时该分区的HW肯定不会超过1(因此此时leader A的HW才为1,怎么会超过1呢),然后B读取leader A的HW,将自己的高于等于HW的部分删除,此时,A也删除了1,B也删除了1,数据1丢了
问题二 数据不一致问题
当follower副本B从leader副本A同步获取数据m2时,两个broker都宕机了,follower副本B所在的机器先重启成功并且follower副本B成为leader副本A,并且开始接受新的消息m3并且更新了自己的高水位。后来broker A也重启了,其所在的副本变成了follower副本A,follower副本A初始化开始根据Leader的高水位执行日志的截断操作,因为此时两副本的高水位值一样,follower副本A不需要截断,最终导致两个副本上的消息出现了错乱不一致的问题
为此,Leader Epoch诞生了
Leader Epoch:一个32位单调递增的数字,代表每一个Leader副本时代,存储于每一条消息,就是没变换一次leader,这个数值都会加1,来表示不同版本的leader
Leader Epoch Start Offset:每一个Leader副本时代的第一条消息的位移,也就是这个副本成为leader是,该副本的新来的第一条消息的offset
比如,某副本HW此时是10,成为了leader,那么这个版本的Leader Epoch Start Offset肯定是11(如果来11的话)
解决消息丢失
在场景1中,当follower副本A重启以后,它会向leader副本B发送一个LeaderEpochRequest请求
,来获取自身所处的leader epoch最新的偏移量是多少,因为followerA和Leader副本B所处的时代相同(leader epoch编码都是0),Leader副本B会返回自己的LEO,也就是2给follower副本A,A发现2比自己LEO大,所以不会删除1,当B宕机,A成为Leader,B恢复后,正常恢复就行了
解决消息不一致
开始的时候副本A是leader副本A,当两个broker在崩溃后重启后,brokerB先成功重启,follower副本B 成为Leader副本B。它会开启一个新的领导者纪元LE1,开始接受消息 m3。然后brokerA又成功重启,此时副本A很自然成为follower副本A,接着它会向leader B发送一个LeaderEpoch request请求,用来确定自己应该处于哪个领导者时代,时代不同,leader B会返回LE1时代的第一个位移,这里返回的值是1(也就是m3所在的位移)。follower B收到这个响应以后会根据这个位移1来截断日志,它知道了应该遗弃掉m2,从位移1开始同步获取日志
10.kafka高效读写原理
1.kafka本身是分布式,分区技术提高了并行度
2.数据采用稀疏索引,可以快速定位要消费的数据
3.kafka写数据时追加到log后面的,是顺序写,写的速度非常快,可达600M/s
4.页缓存+零拷贝
生产者将数据发送到kafka后,kafka会将数据放入操作系统的页缓存(Page Cache是Linux内核管理的内存区域)中,内核有内存管理的功能,它可以将数据持久化到磁盘或者留在内存。至此写结束。读的时候,kafka先看页缓存有没有数据,没有的话从磁盘读取,然后直接通过网关给消费者(如果跨节点会涉及网卡)。所以说,读取数据不走kafka应用层,提高了效率
11.消费者原理
消费方式
消费者主动从kafka拉取数据,原因很简单,不同消费者的消费速度不同,可以根据自己的能力消费,如果kafka推数据的话,因为是一个固定的速度,很难满足所有消费者,但是,kafka没数据的时候,消费者一直尝试拉数据,那就一直返回空,陷入了死循环
消费规则
单个消费者可以同时消费多个分区的数据
消费者组里消费者(消费者的groupid相同),每一个只能消费尚未被组里其他人消费的分区
如果一个消费者组的的消费者个数小于分区数,那么一个消费者可以消费多个分区(前提这几个分区没有被其他消费者正在消费)
如果消费者组的的消费者个数等于分区数,一对一刚好
如果消费者组的的消费者个数大于分区数,那就有空闲的了
offset
1.谁提交,提交到哪里?
消费记录由kafka一个特殊的主题,_consumer_offsets存储,记录每个分区消费到哪里了(offset标识),实际上是每隔5秒(可更改)将当前offset自动(也可以我们自己手动提交)提交到相应的主题。采用K-V存储,K是group.id+topic+分区号,value就是当前offset的值,这样就能保证每个消费者挂了重启后可以继续消费了
自动提交offset代码
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交 offset的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
手动提交offset代码
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
// 消费消息.........
// 异步提交 offset 异步提交就是offset发出去,不管_consumer_offset接收到没有,就开始消费下一批数据了 同步提交就是offset发出去,_consumer_offset接收到,才开始消费下一批数据
consumer.commitAsync();
2.offset使用
可以通过设置参数来规定如何使用offset
auto.offset.reset = earliest | latest | none 默认是 latest
- earliest:分区下有已提交的offset,从提交的offset开始消费,没有offset,从头开始消费
- latest(默认值):分区下有已提交的offset,从提交的offset开始消费,没有offset,从该分区下新产生的数据开始消费
- none:分区下有已提交的offset,从提交的offset开始消费,如果未找到消费者组的先前偏移量(也就是找不到已提交的offset),则向消费者抛出异常
- 任意指定offset位移开始消费
手动指定offset
public class CustomConsumerSeek {
public static void main(String[] args) {
// 1.配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// 2.反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
// 3.配置消费者id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");
// 4.创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
// 5.订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("topicx");
consumer.subscribe(topics);
// 6.指定offset
// 6.1 获取当前主题的分区信息
Set<TopicPartition> assignment = consumer.assignment();
// 6.2 保证分区已存在
while(assignment.size() == 0){
consumer.poll(Duration.ofSeconds(1));
assignment = consumer.assignment();
}
// 6.3 指定offset
for(TopicPartition topicPartition : assignment){
consumer.seek(topicPartition,200);
}
// 6.消费
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.partition()+" " + record.offset() + " "+ record.value());
}
}
}
}
指定消费一定时间内的数据(利用offset的时间戳)
public class CustomConsumerSeekTime {
public static void main(String[] args) {
// 1.配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
// 2.反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
// 3.配置消费者id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"1111");
// 4.创建消费者
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
// 5.订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
consumer.subscribe(topics);
// 6.指定offset
// 6.1 获取当前主题的分区信息
Set<TopicPartition> assignment = consumer.assignment();
// 6.2 保证分区已存在
while(assignment.size() == 0){
consumer.poll(Duration.ofSeconds(1));
assignment = consumer.assignment();
}
// 6.3 指定offset的时间 每个分区对应时间
HashMap<TopicPartition, Long> hashMap = new HashMap<>();
for(TopicPartition topicPartition : assignment){
hashMap.put(topicPartition,System.currentTimeMillis() - 24 * 3600 * 1000);
}
// 通过时间获取offset
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(hashMap);
// 6.4 指定offset
for(TopicPartition topicPartition : assignment){
OffsetAndTimestamp time = map.get(topicPartition);
consumer.seek(topicPartition,time.offset());
}
// 7.消费
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}
哪个消费者消费哪个分区呢?
1.Range策略(针对每个主题):
首先将主题的分区按照序号排序,然后将消费者也排序
假如有个主题有7个分区t1-t7,三个消费者C1-C3
然后通过分区数/消费者数
来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费1个分区
7/3 = 2… 所以每个消费者应该消费2个,多的一个由第一个分区消费
也就是C1消费t1-t3 C2消费t4-t5 C3消费t6-t7,假如其中一个C3挂了,那么这个消费者要消费的分区数据先不会被消费,而是等45秒,等过了45秒后还没被消费就确定真的挂了,那么t6-t7会被分到另外的消费者,实现再平衡
这仅仅是针对一个主题而言
,如果很多主题这样,那么C1将会比其他分区多消费N个!导致数据倾斜
2.RoundRobin策略(针对所有主题)
把所有的分区和所有的消费者列出来,按照hashcode排序,然后通过轮询算法分配
假如有个主题有7个分区t1-t7,三个消费者C1-C3,均是排好序的
那t1给C1 t2给C2 t3给C3 t4给C1 t5给C2…
3. Sticky策略
和Range策略不同的是分区不会排序,而是随机分到每个分区(7/3 = 2… 所以每个消费者应该消费2个,多的一个由第一个分区消费),也就是C1可能消费t2 t3 t6 ,C2消费t1 t4 C3消费t5 t7
4.IDEA配置分区策略
XX是全类名
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"XX")
消费者过程
1.消费者发送拉取请求到ComsumerNetworkClient
2.请求传到集群
3.当等待时间超过fetch.max.wait.ms(默认500ms)或者数据批次超过Fetch.min.bytes(默认1字节,最大是50M)集群发送数据到一个消息队列
4.消费者从队列中拉取数据,默认最大是500条
5.经过序列化器和拦截器后得到最后的数据
消费者提高吞吐量(数据积压了怎么办)
1.可以尽可能让消费者数 = 分区数
2.可以提高批次拉取的大小
3.生产者也可以调整4个参数来缓解
消费者重平衡机制(重要)
来源公众号:华仔聊技术
所谓的消费者组的重平衡目的就是让组内所有的消费者实例对消费哪些主题分区达成一致
对于 Consumer Group 来说,可能随时都会有 Consumer 加入或退出,那么 Consumer 列表的变化必定会引起 Partition 的重新分配。我们将这个分配过程叫做 Consumer Rebalance,但是这个分配过程需要借助 Broker 端的Coordinator 协调者组件
,在 Coordinator 的帮助下完成整个消费者组的分区重分配,也是通过监听ZooKeeper 的 /admin/reassign_partitions 节点触发的
Rebalance 的触发条件有三种
1)当 Consumer Group 组成员数量发生变化(主动加入或者主动离组,故障下线等)
2)当订阅主题数量发生变化
3)当订阅主题的分区数发生变化
Rebalance 触发后如何通知其他 Consumer 进程?
Rebalance 触发后如何通知其他 Consumer 进程?
Rebalance 的通知机制就是靠 Consumer 端的心跳线程,它会定期发送心跳请求到 Broker 端的 Coordinator 协调者组件,当协调者决定开启 Rebalance 后,它会将「REBALANCE_IN_PROGRESS」封装进心跳请求的响应中发送给 Consumer ,当 Consumer 发现心跳响应中包含了「REBALANCE_IN_PROGRESS」,就知道是 Rebalance 开始了
Rebalance 协议说明
其实 Rebalance 本质上也是一组协议,Consumer Group 与 Coordinator 共同使用它来完成 Consumer Group 的 Rebalance
下面我看看这5种协议完成了什么功能:
1)Heartbeat 请求:Consumer 需要定期给 Coordinator 发送心跳来证明自己还活着
2)LeaveGroup 请求:主动告诉 Coordinator 要离开 Consumer Group
3)SyncGroup 请求:Group Leader Consumer 把分配方案告诉组内所有成员
4)JoinGroup 请求:成员请求加入组
5)DescribeGroup 请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
Coordinator 在 Rebalance 的时候主要用到了前面4种请求
Rebalance 流程分析
JoinGroup请求
组内所有成员向 Coordinator 发送 JoinGroup 请求,请求加入组,顺带会上报自己订阅的 Topic,这样 Coordinator 就能收集到所有成员的 JoinGroup 请求和订阅 Topic 信息,Coordinator 就会从这些成员中选择一个担任这个Consumer Group 的 Leader「一般情况下,第一个发送请求的 Consumer 会成为 Leader」
这里说的 Leader 是指具体的某一个 Consumer,它的任务就是收集所有成员的订阅 Topic 信息,然后制定具体的消费分区分配方案。待选出 Leader 后,Coordinator 会把 Consumer Group 的订阅 Topic 信息封装进 JoinGroup 请求的 Response 中,然后发给 Leader ,然后由 Leader 统一做出分配方案后,进入到下一步
SyncGroup 请求
Leader 开始分配消费方案,即哪个 Consumer 负责消费哪些 Topic 的哪些 Partition
一旦完成分配,Leader 会将这个分配方案封装进 SyncGroup 请求中发给 Coordinator ,其他成员也会发 SyncGroup 请求,只是内容为空,待 Coordinator 接收到分配方案之后会把方案封装进 SyncGroup 的 Response 中发给组内各成员, 这样各自就知道应该消费哪些 Partition 了
12.不支持读写分离
原因
1.场景不一致:读写分离适合读操作频繁,写操作相对不频繁的场景,kafka显然不符合
2.mysql的读写分离是这样的,主库更新写操作,从库执行查询读操作,但是,kafka的leader和follower要保持数据一致,如果允许follower提供读服务,可能带来消息滞后的问题
13.Kafka性能问题
对于 Kafka 性能的优化,主要体现在生产者和消费者这两部分业务逻辑中。而 Kafka 本身不需要多关注的主要原因是,对于绝大多数使用Kafka 的业务来说,Kafka 本身的处理能力要远大于业务系统的处理能力。Kafka 单个节点,消息收发的性能可以达到每秒钟处理几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力,对于业务系统处理的业务逻辑要复杂一些,单个节点每秒钟处理几百到几千次请求,已经非常不错了,所以我们应该更关注的是消息的收发两端而不是Kafka
生产者方面
对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。我们知道 Producer 发消息给 Broker 且收到消息并返回 ack 响应,假设这一次过程的平均时延是 1ms,它包括了下面这些步骤的耗时:
1)发送端在发送网络请求之前的耗时
2)发送消息和返回响应在网络传输中的耗时
3)Broker 端处理消息的时延
假设此时你的发送端是单线程,每次只能发送 1 条消息,那么每秒只能发送 1000 条消息,这种情况下并不能发挥出 Kafka 的真实性能。此时无论是增加每次发送消息的批量大小,还是增加并发,都可以成倍地提升发送性能。
如果当前发送端是在线服务的话,比较在意请求响应时延,此时可以采用并发方式来提升性能
如果当前发送端是离线服务的话,更关注系统的吞吐量,发送数据一般都来自数据库,此时更适合批量读取,批量发送来提升性能
另外还需要关注下消息体是否过大,如果消息体过大,势必会增加 IO 的耗时,影响 Kafka 生产和消费的速度,也可能会造成消息积压
消费端性能优化
消费端的性能优化除了优化业务逻辑外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。需要注意的是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区数量,确保 Consumer 的实例数和分区数量是相等的,如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的
消息积压后如何处理
导致消息积压突然增加,只有两种:发送变快了或者消费变慢了
假如赶上大促或者抢购时,短时间内不太可能优化消费端的代码来提升消费性能,此时唯一的办法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,只能降级一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,保证重要业务服务正常
更多推荐
所有评论(0)