公司项目遇到消息队列数据丢失的问题,以此记录下解决方案。

如果kafka的leader挂掉,切换副本节点时产生数据丢失问题,此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,根据业务我们设置的三次重试。

kafka保证消息可靠性,可以通过如下几个配置:

  1. 生产者配置 acks = all (-1) ISR中的所有副本都写入成功,才代表该消息发送成功
  1. min.insync.replicas默认为1,指定了ISR集合中最小的副本数,不满足条件就会抛出NotEnoughReplicasException 或 NotEnoughReplicasAfterAppendException,也就是必须保证有一个副本同步数据跟得上leader
  1. unclean.leader.election.enable 默认为false, 当leader下线的时候可以从非ISR集合中选举新的leader,这样能提高可用性,但会丢数据
  1. 配置log.flush.interval.messages 和 log.flush.interval.ms 同步刷盘策略
  1. 消费端手动提交位移,enable.auto.commit 设置为false,自动提交会带来重复消费和消息丢失的问题,客户端如果消息消费失败应该放入死信队列,以便后期排除故障
  1. 回溯消费功能,消息貌似已经成功消费,但实际消息失败了,不知道是什么错误造成的,就可以通过回溯消费补偿,或者复现 “丢失”,seek()方法

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐