消息队列&&Kafka&&基础使用
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。如何快速部署一个高可用 Kafka 集群发布/订阅: 消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息。削峰填谷应用解耦异步处理。
消息队列&&Kafka&&基础使用
前言
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
如何快速部署一个高可用 Kafka 集群
发布/订阅: 消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息。
应用场景:
- 削峰填谷
- 应用解耦
- 异步处理
相关概念
主题(Topic)
在 Kafka 中,发布订阅的对象是主题(Topic),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题。
生产者(Producer)
消息生产者,就是向 kafka broker 发消息的客户端,生产者程序通常持续不断地向一个或多个主题发送消息。
消费者(Consumer)
消息消费者,负责从Broker中拉取(Pull)订阅的消息并进行消费,通常多个消费者构成一个分组,消息只能被同组中的一个消费者消费。
消费者组Consumer Group (CG)
由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker
服务实例,负责消息的持久化、中转等功能。一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
主题(topic)
消息主题。Kafka 按 topic 对消息进行分类,我们在收发消息时只需指定 topic。
分区(Partition)
分区。为了提升系统的吞吐,一个 topic 下通常有多个 partition,partition 分布在不同的 Broker 上,用于存储 topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 kafka 提供给了并行的消息处理能力和横向扩容能力。另外,为了提升系统的可靠性,partition 通常会分组,且每组有一个主 partition、多个副本 partition,且分布在不同的 broker 上,从而起到容灾的作用。
副本(Replica)
为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower。
leader
每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
follower
每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 follower。
Replica 副本机制
Kafka 每组分区通常有多个副本,同组分区的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滞后)。副本之间是“一主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本负责从 leader 拉取消息进行同步。分区的所有副本统称为 AR(Assigned Replicas),其中所有与 leader 副本保持一定同步的副本(包括 leader 副本在内)组成 ISR(In-Sync Replicas),与 leader 同步滞后过多的副本组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。
follower 副本是否与 leader 同步的判断标准取决于 Broker 端参数 replica.lag.time.max.ms(默认为 10 秒),follower 默认每隔 500ms 向 leader fetch 一次数据,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 leader 是同步的。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。
当 leader 副本所在 Broker 宕机时,Kafka 会借助 ZK 从 follower 副本中选举新的 leader 继续对外提供服务,实现故障的自动转移,保证服务可用。为了使选举的新 leader 和旧 leader 数据尽可能一致,当 leader 副本发生故障时,默认情况下只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(可通过设置 unclean.leader.election.enable 改变)。
当 Kafka 通过多副本机制解决单机故障问题时,同时也带来了多副本间数据同步一致性问题。Kafka 通过高水位更新机制、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,
segment
分段。宏观上看,一个 partition 对应一个日志(Log)。由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据检索效率低下,Kafka 采取了分段和索引机制,将每个 partition 分为多个 segment,同时也便于消息的维护和清理。每个 segment 包含一个.log 日志文件、两个索引(.index、timeindex)文件以及其他可能的文件。每个 Segment 的数据文件以该段中最小的 offset 为文件名,当查找 offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。
offset
消息在日志中的位置,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量。offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
消息日志(Log)
Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。
为了避免不断地日志写入耗尽磁盘空间,Kafka通过日志段(Log Segment)机制定期的删除消息以回收磁盘。
在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的。
集群管理
Kafka 借助 ZooKeeper 进行集群管理。Kafka 中很多信息都在 ZK 中维护,如 broker 集群信息、consumer 集群信息、 topic 相关信息、 partition 信息等。Kafka 的很多功能也是基于 ZK 实现的,如 partition 选主、broker 集群管理、consumer 负载均衡等。
Producer端直接连接Broker,不在zk上存任何数据,只是通过zk监听Broker和Topic等信息。
Broker 异步刷盘机制
kafka 为了获得更高吞吐,Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,而 PageCache 中的数据通过 linux 的 flusher 程序进行异步刷盘(刷盘触发条:主动调用 sync 或 fsync 函数、可用内存低于阀值、dirty data 时间达到阀值),将数据顺序写到磁盘。消息处理示意图如下:
由于消息是写入到 pageCache,单机场景,如果还没刷盘 Broker 就宕机了,那么 Producer 产生的这部分数据就可能丢失。为了解决单机故障可能带来的数据丢失问题,Kafka 为分区引入了副本机制。
系统架构
部署搭建
机器规划
下载并解压安装包
本次 Kafka 搭建的版本是 2.7.1,下载地址可以在 [Kafka 官网下载页面] (https://kafka.apache.org/downloads) 中找到。将下载好的安装包解压到 /usr/local/kafka 目录。
部署 Zookeeper
Kafka 官网提供的压缩包中包含了 Zookeeper 所需的文件,我们可以直接使用 Kafka 提供的文件来部署 Zookeeper。当然你可以单独下载 Zookeeper 的安装包来部署。
mkdir -p /usr/local/zk
# 编辑 config/zookeeper.properties 文件,3 台 Zookeeper 节点的配置文件是相同的。
vi config/zookeeper.properties
#ZooKeeper 使用的基本时间单位(毫秒),心跳超时时间是 tickTime 的两倍
tickTime=2000
#Leader 和 Follower 初始连接时最多能容忍的最多心跳数(2000 * 10 = 20s)
initLimit=10
#Leader 和 Follower 节点之间请求和应答之间能容忍的最多心跳数(2000 * 5 = 10s)
syncLimit=5
#数据目录
dataDir=/usr/local/zk
#监听客户端连接的端口
clientPort=2181
#最大客户端连接数
maxClientCnxns=60
#集群信息(服务器编号,服务器地址,Leader-Follower 通信端口,选举端口)
server.1=192.168.1.6:2888:3888
server.2=192.168.1.7:2888:3888
server.3=192.168.1.8:2888:3888
#不启动 jetty 管理页面服务
admin.enableServer=false
#运行所有四字指令
4lw.commands.whitelist=*
设置节点 id,分别为 3 台 Zookeeper 节点设置不同的节点 id。
#节点 1
echo "1" > /usr/local/zk/myid
#节点 2
echo "2" > /usr/local/zk/myid
#节点 3
echo "3" > /usr/local/zk/myid
# 启动 Zookeeper
# 在 3 台机器上分别使用以下命令启动 Zookeeper。
zookeeper-server-start.sh -daemon config/zookeeper.properties
# 查看 Zookeeper
zookeeper-shell.sh 192.168.1.6:2181
get /zookeeper/config
部署 Kafka
vim config/server.propertie
# 每台 Kafka 节点除了以下配置以外,其余配置相同:
############################# Server Basics #############################
# 每个 Broker 的 id 必须唯一,分别设置为 0,1,2。
broker.id=0
############################# Socket Server Settings #############################
# Kafka Broker 监听地址和端口
listeners=PLAINTEXT://192.168.1.6:9092
# Broker 用于处理网络请求的线程数,默认值 3。
num.network.threads=6
# Broker 用于处理 I/O 的线程数,推荐值 8 * 磁盘数,默认值 8
num.io.threads=120
# 在网络线程停止读取新请求之前,可以排队等待 I/O 线程处理的最大请求个数,默认值 500。增大queued.max.requests 能够缓存更多的请求。
queued.max.requests=1000
# socket 发送缓冲区大小
socket.send.buffer.bytes=102400
# socket 接收缓冲区大小
socket.receive.buffer.bytes=102400
# socket 接收请求的最大值(防止 OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# 数据存放目录,我们在每台机器上使用 15 块硬盘,每块硬盘单独挂载一个目录。
log.dirs=/data1,/data2,/data3,/data4,/data5,/data6,/data7,/data8,/data9,/data10,/data11,/data12,/data13,/data14,/data15
#清理过期数据线程数
num.recovery.threads.per.data.dir=3
#单条消息最大 10 M
message.max.bytes=10485760
############################# Topic Settings #############################
#不允许自动创建 Topic
auto.create.topics.enable=false
#不允许 Unclean Leader 选举。
unclean.leader.election.enable=false
#不允许定期进行 Leader 选举。
auto.leader.rebalance.enable=false
# Topic 的默认分区数。
num.partitions=3
# Topic 中每个分区的默认副本数。
default.replication.factor=3
#当生产者将 acks 设置为 "all"(或"-1")时,此配置指定必须确认写入的副本的最小数量,才能认为写入成功
min.insync.replicas=2
#允许删除主题
delete.topic.enable=true
############################# Log Flush Policy #############################
#建议由操作系统使用默认设置执行后台刷新
#日志落盘消息条数阈值
#log.flush.interval.messages=10000
#日志落盘时间间隔
#log.flush.interval.ms=1000
#检查是否达到flush条件间隔
#log.flush.scheduler.interval.ms=200
############################# Log Retention Policy #############################
#日志留存时间 7 天
log.retention.hours=168
#最多存储 58TB 数据
log.retention.bytes=63771674411008
#日志文件中每个 segment 的大小为 1G
log.segment.bytes=1073741824
#检查 segment 文件大小的周期 5 分钟
log.retention.check.interval.ms=300000
#开启日志压缩
log.cleaner.enable=true
#日志压缩线程数
log.cleaner.threads=8
############################# Zookeeper #############################
#Zookeeper 连接参数
zookeeper.connect=192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181
#连接 Zookeeper 的超时时间
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
#为了缩短多消费者首次平衡的时间,这段延时期间 10s 内允许更多的消费者加入组
group.initial.rebalance.delay.ms=10000
#心跳超时时间默认 10s,设置成 6s 主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer
session.timeout.ms = 6s。
#心跳间隔时间,session.timeout.ms >= 3 * heartbeat.interval.ms。
heartbeat.interval.ms=2s
#最长消费时间 5 分钟
max.poll.interval.ms=300000
启动 Kafka
kafka-server-start.sh -daemon config/server.properties
查看 Kafka 集群
#连接 Zookeeper
zookeeper-shell.sh 127.0.0.1:2181
#查看 Kafka 节点
ls /brokers/ids
[0, 1, 2]
#查看 Kafka Controller
get /controller
{"version":1,"brokerid":0,"timestamp":"1631005545929"}
部署 Kafka Eagle 可视化工具
Kafka Eagle 是一款 Kafka 可视化和管理软件,支持对多个不同版本的 Kafka 集群进行管理。Kafka Eagle 可以监控 Kafka 集群的健康状态,消费者组的消费情况,创建和删除 Topic,支持使用 KSQL 对 Kafka 消息做 Ad-hoc 查询,支持多种告警方式等等。
Kafka Eagle
vim /etc/profile
export KE_HOME=/usr/local/kafka-eagle/kafka-eagle-web-2.0.6
export PATH=$PATH:$KE_HOME/bin
vim conf/system-config.properties
######################################
# Kafka 集群列表
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181
######################################
# Zookeeper 线程池最大连接数
######################################
kafka.zk.limit.size=32
######################################
# Kafka Eagle 的页面访问端口
######################################
kafka.eagle.webui.port=8048
######################################
# 存储消费信息的类型,在 0.9 版本之前,消费
# 信息会默认存储在 Zookeeper 中,存储类型
# 设置 Zookeeper 即可;如果是在 0.10 版本之后,
# 消费者信息默认存储在 Kafka 中,存储类型
# 设置为 kafka。
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# Kafka JMX 指标监控,Kafka 需要开启 JMX
######################################
cluster1.kafka.eagle.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# 开启性能监控,数据默认保留 30 天
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15
######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.topic.preview.records.max=10
######################################
# 删除 Kafka Topic 时需要输入的密钥
######################################
kafka.eagle.topic.token=keadmin
######################################
# 存储 Kafka Eagle 元数据信息的数据库
# 目前支持 MySQL 和 Sqlite,默认使用 Sqlite 进行存储
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/usr/local/kafka-eagle/kafka-eagle-web-2.0.6/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=123456
# 启动 Kafka Eagle
bin/ke.sh start
迁移与回退
阶段一:停止生产端相关业务
阶段二:切换至新 kafka 生产消费
回退阶段一:停止生产端相关业务
回退阶段二:应用切回到原来的kafka
更多推荐
所有评论(0)