一. Kafka出现的问题

          问题:Kafka是当下流行的高并发消息中间件,能够高效并实时的吞吐数据,而且通过副本冗余机制保证了数据安全。

                     但还是会出现  丢包 or 重复消费  问题

二.Kafka生产消息流程   

2.1  生产者命令

         创建主题时,就已经指定了分区数 副本数

         sh kafka-console-producer.sh        --broker-list IP:PORT        --topic topic_name

3.1 生产消息流程(以指定分区的为例)

         |—— producer根据指定的partition方法(round-robin hash),将消息发布到指定topic的partition里面

         2.1 查找: Producer从zk的/brokers/..state节点中查找该分区的leader

         2.2 发送:Producer发送消息到leader,并记录到本地log

         2.3 拉取:follower从leader拉取消息后,记录到本地log,并向leader发ack消息

         2.4 更新: leader收到所有ISR所有的ack确认后,更新HW             

三.Kafka消费消息流程

         3.1 fetch: Consumer发送FetchRequest到leader所在的broker

         3.2 查找:Broker获取消息的start offset和size,查找相应的消息———根据offset查找message

         3.3 提交:Consumer  fetch到消息后,提交偏移量

四. Kafka出现的丢包问题,如何保证不丢

        (1s内丢进了5w条数据,持续几分钟)

           场景:某时间端用户流量激增,导致服务器网卡爆满,磁盘卡死等

           解决:首先对kafka限速,其次启动重试机制(leader收到所有ISR的ack消息后,更新HW点,向Client发送ack消息)

五. Kafka出现的重发问题,如何保证不重

           根本原因:根本原因是Kafka已经消费了数据,但是没有提交offset序列号

                 场景:1. Consumer消费一条数据平均需要200ms时间,某时刻用户流量激增

                                Consumer取出一批数据进行消费,但是在session.timeout.ms时间之内没有消费完成

                                导致consumer coordinator挂掉,kafka的自动提交失败(offset不再更新),Client重新分配分区

                                此时重新分配分区的Consumer重复消费了之前发送的数据

                            2. 消费逻辑 与 业务逻辑在一个线程处理,可能会出现业务逻辑阻塞,导致session超时

                     解决:增加partition数量,提高Consumer并发能力 

                                适当延长session.timeout.ms的时间

                                对于单partition的Consumer线 程,设置一个线程池提高并发能力

                                减少每次从分区中获取的 数据分片的大小

                                offset手动处理,业务逻辑成功后,再提交offset

                                通过数据库 或 redis,保证数据唯一(唯一索引,先查询是否存在) ——效率低

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐