Kafka线上问题优化

  • 如何防止消息丢失
    发送方: ack是 1 或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack设成all,把min.insync.replicas配置成分区备份数
    消费方:把自动提交改为手动提交。
  • 如何防止消息的重复消费
    一条消息被消费者消费多次。如果为了消息的不重复消费,而把生产端的重试机制关闭、消费端的手动提交改成自动提交,这样反而会出现消息丢失,那么可以直接在防治消息丢失的手段上再加上消费消息时的幂等性保证,就能解决消息的重复消费问题。
    幂等性如何保证:
    mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
    使用redis或zk的分布式锁(主流的方案)
    - 如何做到顺序消费
    发送方:在发送时将ack不能设置 0 ,关闭重试,使用同步发送,等到发送成功再发送下一条。确保消息是顺序发送的。
    接收方:消息是发送到一个分区中,只能有一个消费组的消费者来接收消息。因此,kafka的顺序消费会牺牲掉性能。
  • 解决消息积压问题

消息积压会导致很多问题,比如磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:
方案一:在一个消费者中启动多个线程,让多个线程同时消费。——提升一个消费者的消费能力(增加分区增加消费者)。
方案二:如果方案一还不够的话,这个时候可以启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同一服务器上也可以提高消费能力——充分利用服务器的cpu资源。
方案三:让一个消费者去把收到的消息往另外一个topic上发,另一个topic设置多个分区和多个消费者 ,进行具体的业务消费。

  • 延迟队列

延迟队列的应用场景:在订单创建成功后如果超过 30 分钟没有付款,则需要取消订单,此时可用延时队列来实现

创建多个topic,每个topic表示延时的间隔

topic_5s: 延时5s执行的队列
topic_1m: 延时 1 分钟执行的队列
topic_30m: 延时 30 分钟执行的队列
消息发送者发送消息到相应的topic,并带上消息的发送时间
消费者订阅相应的topic,消费时轮询消费整个topic中的消息
如果消息的发送时间,和消费的当前时间超过预设的值,比如 30 分钟
如果消息的发送时间,和消费的当前时间没有超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费
下次继续消费该offset处的消息,判断时间是否已满足预设值。

Kafka-eagle监控平台

kafka配一个web版的管理页面或仪表盘,管理起来更加方便

源码: https://github.com/smartloli/kafka-eagle/
官网:https://www.kafka-eagle.org/
下载: http://download.kafka-eagle.org/
安装文档: https://docs.kafka-eagle.org/2.env-and-install

- 安装JDK
如果Linux服务器上有JDK环境,则可以忽略此步骤,并安装链接的第二部分。如果没有JDK,先到甲骨文官网下载JDK.

- JAVA_HOME 配置:

cd /usr/java
tar -zxvf jdk-xxxx.tar.gz
mv jdk-xxxx jdk1.8
...
vi /etc/profile

export JAVA_HOME=/usr/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin

Then, we use the . /etc/profile to enable the configuration to take effect immediately.
java -version 查看

java version "1.8.0_60"
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
  • Kafka Eagle 安装
tar -zxvf kafka-eagle-${version}-bin.tar.gz

Kafka Eagle 配置

vi /etc/profile

export KE_HOME=/data/soft/new/kafka-eagle
export PATH=$PATH:$KE_HOME/bin

Then, we use the . /etc/profile to enable the configuration to take effect immediately.

  • Kafka Eagle system-config file 配置文件
# 配置zk  去掉cluster2
efak.zk.cluster.alias=cluster1
cluster1.zk.list=10.100.12.246:2181
# cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

# 配置mysql
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://172.16.253.22:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password= 123456

对于数据库的配置,可以选择数据库也可以选择sqlite,其实更推荐sqlLite ,因为他不需要另外安装软件。

  • 启动 Kafka Eagle
cd ${KE_HOME}/bin
chmod +x ke.sh
./ke.sh start

到此就完成了 Kafka Eagle 的安装。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐