kafka
kafkaproducer pushconsumer pull 长轮训多个broker rebalancetopicpartition内保持有序 按key分区group 组播连续存储,写入很快,按offset访问按segment删除sendfile不走应用层 不能做死信队列和延迟队列rabbitmq可以 因为用mmap做零拷贝rocketmq按级别延迟activemq也不可以kafka 如何实现死
kafka
producer push
consumer pull 长轮训
多个broker rebalance
topic
partition内保持有序 按key分区
group 组播
连续存储,写入很快,按offset访问
按segment删除
sendfile不走应用层 不能做死信队列和延迟队列
rabbitmq可以 因为用mmap做零拷贝
rocketmq按级别延迟
activemq也不可以
kafka 如何实现死信和延迟
重复消费
场景:
- 消费者消费处理时间过长,导致提交新offset失败
- 触发rebalance
消息丢失
需要分为三个阶段来分析:
3. producer发送失败
4. dmq本身丢失存储
5. 消费失败
Kafka消费者以消费者组(Consumer Group)的形式消费一个topic,发布到topic中的每个记录将传递到每个订阅消费者者组中的一个消费者实例。Consumer Group 之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。生产环境中消费者在消费消息的时候若不考虑消费者的相关特性可能会出现重复消费的问题。
在讨论重复消费之前,首先来看一下kafka中跟消费者有关的几个重要配置参数。
enable.auto.commit 默认值true,表示消费者会周期性自动提交消费的offset
auto.commit.interval.ms 在enable.auto.commit 为true的情况下, 自动提交的间隔,默认值5000ms
max.poll.records 单次消费者拉取的最大数据条数,默认值
max.poll.interval.ms 默认值5分钟,表示若5分钟之内消费者没有消费完上一次poll的消息,那么consumer会主动发起离开group的请求
在常见的使用场景下,我们的消费者配置比较简单,特别是集成Spring组件进行消息的消费,通常情况下我们仅需通过一个注解就可以实现消息的消费。例如如下代码:
这段代码中我们配置了一个kafka消费注解,制定消费名为"test1"的topic,这个消费者属于"group1"消费组。开发者只需要对得到的消息进行处理即可。那么这段 代码中的消费者在这个过程中是如何拉取消息的呢,消费者消费消息之后又是如何提交对应消息的位移(offset)的呢?
实际上在autocommit=true时,当上一次poll方法拉取的消息消费完时会进行下一次poll,在经过auto.commit.interval.ms间隔后,下一次调用poll时会提交所有已消费消息的offset。
为了验证consumer自动提交的时机,配置消费者参数如下:
为了便于获取消费者消费进度,以下代码通过kafka提供的相关接口定时每隔5s获取一次消费者的消费进度信息,并将获取到的信息打印到控制台。
对于topic test1,为了便于观察消费情况,我们仅设置了一个partition。对于消费者组group1的配置参数,消费者会单次拉取消息数20条,消费每条消息耗费1s,部分记录日志打印结果如下:
从日志中可以看出,消费组的offset每40s更新一次,因为每次poll会拉取20条消息,每个消息消费1s,在第一次poll之后,下一次poll因为没有达到auto.commit.interval.ms=30s,所以不会提交offset。第二次poll时,已经经过40s,因此这次poll会提交之前两次消费的消息,offset增加40。也就是说只有在经过auto.commit.interval.ms间隔后,并且在下一次调用poll时才会提交所有 已消费消息的offset。
考虑到以上消费者消费消息的特点,在配置自动提交enable.auto.commit 默认值true情况下,出现重复消费的场景有以下几种:
Consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。
例如在一次poll500条消息后,消费到200条时,进程被强制kill消费导致offset 未提交,或出现异常退出导致消费到offset未提交。下次重启时,依然会重新拉取这500消息,这样就造成之前消费到200条消息重复消费了两次。因此在有消费者线程的应用中,应尽量避免使用kill -9这样强制杀进程的命令。
消费者消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。若消费者消费的消息比较耗时,那么这种情况可能就会出现。
为了复现这种场景,我们对消费者重新进行了配置,消费者参数如下:
在消费过程中消费者单次会拉取11条消息,每条消息耗时30s,11条消息耗时 5分钟30秒,由于max.poll.interval.ms 默认值5分钟,所以理论上消费者无法在5分钟内消费完,consumer会离开组,导致rebalance。
实际运行日志如下:
可以看到在消费完第11条消息后,因为消费时间超出max.poll.interval.ms 默认值5分钟,这时consumer已经离开消费组了,开始rebalance,因此提交offset失败。之后重新rebalance,消费者再次分配partition后,再次poll拉取消息依然从之前消费过的消息处开始消费,这样就造成重复消费。而且若不解决消费单次消费时间过长的问题,这部分消息可能会一直重复消费。
对于上述重复消费的场景,若不进行相应的处理,那么有可能造成一些线上问题。为了避免因重复消费导致的问题,以下提供了两种解决重复消费的思路。
第一种思路是提高消费能力,提高单条消息的处理速度,例如对消息处理中比 较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费时常的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不 必要的rebalance,此外可适当减小max.poll.records的值,默认值是500,可根 据实际消息速率适当调小。这种思路可解决因消费时间过长导致的重复消费问题, 对代码改动较小,但无法绝对避免重复消费问题。
第二种思路是引入单独去重机制,例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。
此外,在一些消费场景中,我们可以将消费的接口幂等处理,例如数据库的查 询操作天然具有幂等性,这时候可不用考虑重复消费的问题。对于例如新增数据的操作,可通过设置唯一键等方式以达到单次与多次操作对系统的影响相同,从而使接口具有幂等性。
kafka partition(分区)与 group
一、
1、原理图
2、原理描述
一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。Zookeerper中保存这每个topic下的每个partition在每个group中消费的offset
新版kafka把这个offsert保存到了一个__consumer_offsert的topic下
这个__consumer_offsert 有50个分区,通过将group的id哈希值%50的值来确定要保存到那一个分区. 这样也是为了考虑到zookeeper不擅长大量读写的原因。
所以,如果要一个group用几个consumer来同时读取的话,需要多线程来读取,一个线程相当于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。
假设一个topic test 被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 除非用显式的用–from-beginnging 来获取从0开始数据
3、查看topic-group的offsert
位置:zookeeper
路径:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions
在zookeeper的topic中有一个特殊的topic __consumer_offserts
计算方法:(放入哪个partitions)
int hashCode = Math.abs(“ttt”.hashCode());
int partition = hashCode % 50;
先计算group的hashCode,再除以分区数(50),可以得到partition的值
使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter “kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter”
4.参数
auto.offset.reset:默认值为largest,代表最新的消息,smallest代表从最早的消息开始读取,当consumer刚开始创建的时候没有offset这种情况,如果设置了largest,则为当收到最新的一条消息的时候开始记录offsert,若设置为smalert,那么会从头开始读partition
二、
1、Topic
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),如下图所示。
2、对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),
因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。
例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示。
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
3、producer
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner implements Partitioner {
public JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。
public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
List messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 4; j++){
messageList.add(new KeyedMessage<String, String>(“topic2”, j+"", "The " + i + " message for key " + j));
}
producer.send(messageList);
}
producer.close();
}
则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。
4、consumer group (本节所有描述都是基于Consumer hight level API而非low level API)。
使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
下面这个例子更清晰地展示了Kafka Consumer Group的特性。首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。
更多推荐
所有评论(0)