一 :消息队列

1:什么是消息队列

        消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

         消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到MQ 中而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

2:为什么需要消息队列

(1)解耦

        允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

(2)冗余

        消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

(3)扩展性

        因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

(4)灵活性&峰值处理能力

        在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

(5)可恢复性

        系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统复后被处理。

(6)顺序保证:

        在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)

(7)缓冲

        有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

(8)异步通信

        很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

二:Kafka 基础与入门

1:kafka基本概念

        Kafka 是一种高吞吐量的分布式发布/订阅消息系统,这是官方对 kafka 的定义,这样说起来,可能不太好理解,这里简单举个例子:现在是个大数据时代,各种商业、社交、搜索、浏览都会产生大量的数据。那么如何快速收集这些数据,如何实时的分析这些数据,是一个必须要解决的问题,同时,这也形成了一个业务需求模型,即生产者生产(produce)各种数据,消费者(consume)消费(分析、处理)这些数据。那么面对这些需求,如何高效、稳定的完成数据的生产和消费呢?这就需要在生产者与消费者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间如何传递消息。

        kafka 是 Apache 组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop平台的数据分析、低时延的实时系统、storm/spark 流式处理引擎等。kafka 现在它已被多家大型公司作为多种类型的数据管道和消息系统使用。

2:kafka 角色术语

        kafka的一些核心概念和角色

序号

术语

描述

1

Broker

Kafka集群中的一个或多个服务器,负责消息的存储和转发。

2

Topic

消息的分类,Kafka中发布消息时指定的主题,所有相关消息均归类于同一Topic。

3

Producer

消息的生产者,负责将消息发布到Kafka的Topic中。

4

Consumer

消息的消费者,从Kafka的Topic中拉取并消费消息。

5

Partition

每个Topic包含的一个或多个分区,每个Partition都是一个有序的队列,消息按顺序存储,并被分配一个唯一的offset。

6

Consumer Group

消费者组,Kafka中的一组消费者,共享一个Topic中的多个Partition的订阅和消费。同一个消费者组内的消费者可以分担消息处理的负载。

7

Message

消息,Kafka通信的基本单位,由Producer发布到Topic中,并由Consumer消费。

3:kafka拓扑架构

        一个典型的 Kafka 集群包含若干 Producer,若干broker、若干Consumer Group,以及一个 Zookeeper 集群。Kafka通过 Zookeeper 管理集群配置,选举 leader,以及在Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到broker,Consumer 使用 pu11 模式从 broker 订阅并消费消息。典型架构如下图所示:

        从图中可以看出,典型的消息系统有生产者(Producer),存储系统(broker)和消费者(Consumer)组成,Kafka作为分布式的消息系统支持多个生产者和多个消费者,生产者可以将消息分布到集群中不同节点的不同 Partition 上,消费者也可以消费集群中多个节点上的多个 Partition。在写消息时允许多个生产者写到同一个 Partition 中,但是读消息时一个 Partition 只允许被一个消费组中的一个消费者所消费,而一个消费者可以消费多个 Partition。也就是说同一个消费组下消费者对 Partition 是互斥的,而不同消费组之间是共享的

         kafka 支持消息持久化存储,持久化数据保存在 kafka的日志文件中,在生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker 中进行存储,为了减少磁盘写入的次数,broker 会将消息暂时缓存起来,当消息的个数或尺寸、大小达到一定阀值时,再统一写到磁盘上,这样不但提高了 kafka 的执行效率,也减少了磁盘 I0 调用次数。kafka 中每条消息写到 partition 中,是顺序写入磁盘的,这个很重要,因为在机械盘中如果是随机写入的话,效率将是很低的,但是如果是顺序写入,那么效率却是非常高,这种顺序写入磁盘机制是 kafka高吞吐率的一个很重要的保证。

4:Topic和partition

        Kafka 中的 topic(主题)是以 partition 的形式存放的,每一个 topic 都可以设置它的 partition数量,Partition的数量决定了组成topic的log的数量。推荐partition的数量一定要大于同时运行的 consumer 的数量。另外,建议 partition 的数量要小于等于集群 broker 的数量,这样消息数据就可以均匀的分布在各个 broker中

5:Producer 生产机制

        Producer 是消息和数据的生产者,它发送消息到 broker 时,会根据 Paritition 机制选择将其存储到哪一个 Partition。如果 Partition 机制设置的合理,所有消息都可以均匀分布到不同的 Partition 里,这样就实现了数据的负载均衡。如果一个 Topic 对应一个文件,那这个文件所在的机器 I/0 将会成为这个 Topic 的性能瓶颈,而有了 Partition后,不同的消息可以并行写入不同 broker 的不同 Partition 里,极大的提高了吞吐率。

6:Consumer 消费机制

        Kafka 发布消息通常有两种模式:队列模式(queuing)和发布/订阅模式(publish-subscribe)。在队列模式下,只有一个消费组,而这个消费组有多个消费者,一条消息只能被这个消费组中的一个消费者所消费;而在发布/订阅模式下,可有多个消费组,每个消费组只有一个消费者,同一条消息可被多个消费组消费。Kafka 中的 Producer 和 consumer 采用的是 push、pu11 的模式,即 producer 向 broker进行 push 消息,comsumer 从 bork 进行 pu11 消息,push 和 pu11 对于消息的生产和消费是异步进行的。pu11模式的一个好处是consumer 可自主控制消费消息的速率,同时consumer 还可以自己控制消费消息的方式是批量的从 broker 拉取数据还是逐条消费数据

三:zookeeper 概念介绍

        ZooKeeper 是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和 slave 误以为出现两个 activemaster,最终使得整个集群处于混乱状态

        ZooKeeper 是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。

1:zookeeper 应用举例

(1)什么是单点故障问题呢?

        所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。那我们的解决方法就是通过对集群master 角色的选取,来解决分布式系统单点故障的问题。

        传统的方式是采用一个备用节点,这个备用节点定期向主节点发送 ping包,主节点收到 ping 包以后向备用节点发送回复 Ack 信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务。

        这种传统解决单点故障的方法,虽然在一定程度上解决了问题,但是有一个隐患,就是网络问题,可能会存在这样一种情况:主节点并没有出现故障,只是在回复ack 响应的时候网络发生了故障,这样备用节点就无法收到回复,那么它就会认为主节点出现了故障,接着,备用节点将接管主节点的服务,并成为新的主节点,此时,分布式系统中就出现了两个主节点(双 Master 节点)的情况,双Master 节点的出现,会导致分布式系统的服务发生混乱。这样的话,整个分布式系统将变得不可用。为了防止出现这种情况,就需要引入ZooKeeper 来解决这种问题。

2:zookeeper 的工作原理是什么?

(1)master 启动

        在分布式系统中引入 Zookeeper 以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是 主节点A 和 主节点B,当两个主节点都启动后,它们都会向 ZooKeeper中注册节点信息。我们假设 主节点A锁注册的节点信息是 master00001, 主节点B注册的节点信息是 master00002,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法,那么编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节点A 将会获得锁成为主节点,然后 主节点B将被阻塞成为一个备用节点。这样,通过这种方式 Zookeeper 就完成了对两个Master 进程的调度。完成了主、备节点的分配和协作。

(2)master 故障

        如果 主节点A发生了故障,这时候它在 ZooKeeper 所注册的节点信息会被自动删除,而 ZooKeeper 会自动感知节点的变化,发现 主节点A 故障后,会再次发出选举,这时候 主节点B 将在选举中获胜,替代主节点A 成为新的主节点,这样就完成了主、被节点的重新选举。

(3)master 恢复

        如果主节点恢复了,它会再次向 ZooKeeper 注册自身的节点信息,只不过这时候它注册的节点信息将会变成 master00003 ,而不是原来的信息。ZooKeeper 会感知节点的变化再次发动选举,这时候主节点B在选举中会再次获胜继续担任 主节点,主节点A会担任备用节点。

        zookeeper 就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步的

3:zookeeper 集群架构

        zookeeper 一般是通过集群架构来提供服务的,下图是 zookeeper 的基本架构图。

        zookeeper集群主要角色有server和client,其中server又分为leader、follower和 observer 三个角色,每个角色的含义如下:Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。follower:跟随着角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。

        observer:观察者角色,用户接收客户端的请求,并将写请求转发给1eader,同时同步leader 状态,但是不参与投票。0bserver 目的是扩展系统,提高缩性。client:客户端角色,用于向 zookeeper 发起请求。

4:zookeeper 的工作流程

        Zookeeper 修改数据的流程:Zookeeper 集群中每个 Server 在内存中存储了一份数据,在 Zookeeper 启动时,将从实例中选举一个Server 作为leader,Leader 负责处理数据更新等操作,当且仅当大多数 Server 在内存中成功修改数据,才认为数据修改成功。Zookeeper 写的流程为:客户端client 首先和一个 Server 或者 0bserve 通信,发起写请求,然后 Server 将写请求转发给 Leader,Leader 再将写请求转发给其它 Server,其它 Server 在接收到写请求后写入数据并响应 Leader,Leader 在接收到大多数写成功回应后,认为数据写成功,最后响应 Client,完成一次写操作过程。

四:Zookeeper在Kafka中的作用

1:Broker 注册

        Broker 是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的 Broker 管理起来,此时就使用到了 Zookeeper。在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点:/brokers/ids

        每个 Broker 在启动时,都会到Zookeeper 上进行注册,即到/brokers/ids 下创建属于自己的节点,如/brokers/ids/[0...N]Kafka 使用了全局唯一的数字来指代每个 Broker 服务器,不同的 Broker 必须使用不同的Broker ID 进行注册,创建完节点后,每个Broker 就会将自己的 IP 地址和端口信息记录到该节点中去。其中,Broker 创建的节点类型是临时节点,一旦Broker 宕机,则对应的临时节点也会被自动删除。

2:Topic 注册

        在 Kafka 中,同一个 Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护,由专门的节点来记录,如:/borkers/topics

        Kafka 中每个Topic 都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker 服务器启动后,会到对应 Topic 节点(/brokers/topics)上注册自己的 Broker ID 并写入针对该 Topic 的分区总数,如/brokers/topics/login/3-、>2,这个节点表示Broker ID为3的一个Broker服务器,对于"1ogin"这个Topic 的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。

3:生产者负载均衡

        由于同一个 Topic 消息会被分区并将其分布在多个Broker 上,因此,生产者需要将消息合理地发送到这些分布式的 Broker 上,那么如何实现生产者的负载均衡,Kafka 支持传统的四层负载均衡,也支持 Zookeeper 方式实现负载均衡。

(1)四层负载均衡

        根据生产者的 IP 地址和端口来为其确定一个相关联的 Broker。通常,一个生产者只会对应单个 Broker,然后该生产者产生的消息都发往该 Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的 TCP 连接,只需要和 Broker 维护单个 TCP 连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker 的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的 Broker 接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker 的新增和删除。

(2)使用 Zookeeper 进行负载均衡

        由于每个 Broker 启动时,都会完成 Broker 注册过程,生产者会通过该节点的变动态地感知到 Broker 服务器列表的变更,这样就可以实现动态的负载均衡机制。

5:分区 与 消费者 的关系

        消费组(Consumer Group)下有多个 Consumer(消费者)。对于每个消费者组(Consumer Group),Kafka 都会为其分配一个全局唯一的 Group ID,Group 内部的所有消费者共享该 ID。订阅的topic 下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka 为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

6:消息消费进度 0ffset 记录

        在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset 记录到Zookeeper 上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。0ffset 在 Zookeeper 中由一个专门节点进行记录,其节点路径为:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]节点内容就是 Offset 的值。

7:消费者注册

        消费者服务器在初始化启动时加入消费者分组的步骤如下:

(1)注册到消费者分组

        每个消费者服务器启动时,都会到 Zookeeper 的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的 Topic 信息写入该临时节点。(2)对消费者分组中的消费者的变化注册监听每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids 节点注册子节点变化的 Watcher 监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

(3)对 Broker 服务器变化注册监听

        消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现 Broker 服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

(4)进行消费者负载均衡

        为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者与 消息分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker 服务器发生变更,会发出消费者负载均衡。

五.群集部署

一、环境准备
  • 主机信息
    • kafka1:192.168.10.101
    • kafka2:192.168.10.102
    • kafka3:192.168.10.103
二、Zookeeper部署
1. 安装Java

在每个Kafka节点上安装Java环境(以kafka1为例):

[root@kafka1 ~]# yum -y install java
2. 安装Zookeeper
  • 解压Zookeeper到指定目录:

[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz  
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

创建数据保存目录:

[root@kafka1 ~]# mkdir /etc/zookeeper/zookeeper-data
3. 修改配置文件
  • 复制并编辑配置文件:
[root@kafka1 ~]# cd /etc/zookeeper/conf  
[root@kafka1 conf]# mv zoo_sample.cfg zoo.cfg  
[root@kafka1 conf]# vim zoo.cfg

 在zoo.cfg中设置:

dataDir=/etc/zookeeper/zookeeper-data  
clientPort=2181  
server.1=192.168.10.101:2888:3888  
server.2=192.168.10.102:2888:3888  
server.3=192.168.10.103:2888:3888
4. 创建节点ID文件
  • 每个节点上创建对应的myid文件:
# 节点1  
[root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid  
# 节点2  
[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid  
# 节点3  
[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid
5. 启动Zookeeper
  • 在每个节点上启动Zookeeper服务:
[root@kafka1 zookeeper]# ./bin/zkServer.sh start  
[root@kafka1 zookeeper]# ./bin/zkServer.sh status

三、Kafka部署
1. 安装Kafka
  • 解压Kafka到指定目录:
[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz  
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
2. 修改配置文件
  • 编辑Kafka配置文件(注意修改broker.id, listeners, 和zookeeper.connect):
    [root@kafka1 ~]# cd /etc/kafka/config  
    [root@kafka1 config]# vim server.properties
    broker.id=1  # 根据节点不同修改,其他节点为2和3  
    listeners=PLAINTEXT://192.168.10.101:9092  # 修改为各节点的IP  
    log.dirs=/etc/kafka/kafka-logs  
    zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:2181
    3. 创建日志目录
  • 在每个Kafka节点上创建日志目录:
[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs
4. 启动Kafka服务
  • 在每个Kafka节点上启动Kafka服务:
[root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties &

四、测试Kafka集群
1. 创建Topic
  • 在任意Kafka测试
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.10.101:2181 --replication-factor 1 --partitions 1 --topic test

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐