zookeeper和消息队列kafka
一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader保持同步,消费的话也是在Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。理解成一个逻辑概念,下面有很多的
ZooKeeper是一种分布式应用所涉及的高可用、高性能且一致性的开源协调服务,
它提供了一项基本服务:分布式锁服务。分布式应用可以基于它实现更高级的服务,实现诸如同步服务、配置维护和集群管理或者命名的服务。
Zookeeper服务自身组成集群,2n+1个(奇数)主机。
在集群中,允许n个主机宕机,只要集群中有一半以上的机器可用,zookeeper集群就可用。
#比如: 1、假如zookeeper为3台机器组成的集群,那么就可以允许失效一台,如果失效了2台,就会导致zookeeper集群不可用。 2、所以在搭建zookeeper集群时,主机数需要为奇数。 3、奇数的目的:为了提高容错能允许多损失一台。
Zookeeper从设计模式角度来理解
是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已在Zookeeper上注册的那些观察者做出相应的反应。
也就是说Zookeeper=文件系统+通知机制。
文件系统: 就是将存储的数据通过zookeeper的文件系统进行存储到各个节点上。
通知机制: 当某个节点出现故障,zookeeper会将信息通知到客户端上。
总结:每个节点服务器都会在zookeeper中进行注册登记,client也获取当前在线服务器的列表,也会在zookeeper上进行注册登记,client在zookeeper集群上存储的数据。都会通过文件系统分布式存储到各个集群节点中,当集群中某个节点出现故障,zookeeper也会通知到client客户端。
Zookeeper数据模型的结构与linux文件系统很类似,整体上可以看作时一棵树,每个节点乘坐一个Znode。
每个znode默认能存储1mb的数据,每个znode都可以通过其路径唯一标识。
Zookeeper: 一个领导者(Leader),多个跟随者(Follower)组成的集群
Zookeeper集群中只有半数以上节点存储,Zookeeper集群就能正常服务,所以Zookeeper适合安装奇数台服务器。
全局数据一致性: 每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的。
更新请求顺序执行: 来自同一个client的更新请求按其发送顺序依次执行,即先进先出。
数据更新原子性: 一次数据更新,要么成功,要么失败。
实时性: 在一定范围内,client能读到最新数据
总结:zookeeper集群中有一个lleader和多个follower,且zookeeper集群具有数据的一致性、原子性、实时性。且数据更新时按照发送顺序进行更新。
5.1 第一次启动选举机制
1、服务器1启动,发起一次选举。服务器1投自己一票:
此时服务器1的票数为1票,不够半数以上(5台节点,半数2.5),选举无法完成,服务器1状态保持为LOOKING.
2、服务器2启动,再发起一次选举,服务器1和2分别投自己1票,并交换选票信息:
此时服务器1发现服务器2的myid比自己投票的服务器myid要大,所以将票投给服务器
此时服务器1有0票,服务器2有2票,不够半数,选举无法完成,服务器1和2都保持looking状态。
3、服务器3启动,发起一次选举:
服务器1和2发现服务器3的myid最大,就将选票信息投给服务器3.
此时服务器1和2都有0票。服务器3有3票,超过半数,服务器3称为leader,服务器1和2更换状态为following。
4、服务器4启动,发起一次选举,此时服务器1,2,3已不是looking状态,不会更改选票信息:
此时服务器3为3票,服务器4为1票,此时服务器服从多数,更改修选票信息为服务器3,并更改状态为following。
5、服务器5启动,情况和4一样,状态为following。
5.2 非第一次启动选举机制
1、当zookeerper集群中一台服务器出现一下两种请款之一时,就会开始进行leader选举。
服务器初始化启动(第一次启动选举机制)
服务器运行期间无法和leader保持连接(不知道leader是否已经产生了,或者leader宕机)
2、当一台机器进入leader选举流程时,当前集群也可能处于一下两种状态。
①集群中本来就已经存储一个leader
对于已存在leader的情况,机器试图去选举时,被会告知当前服务器的leader信息,对于该机器来说,仅仅需要和leader机器建立连接,并进行状态同步即可。
②集群中leader宕机了。
假设zookeeper由5台服务器组成,SID分别为1,2,3,4,5。ZXID分别为:8,8,8,7.并且此时SID为3的服务器时leader。
某一时刻,当3和5都出现故障时,因此又重新开始选举。
#选举leader的规则 1、EPOCH大的直接胜出。 2、EPOCH相同,事务ID(ZXID)大的胜出. 3、ZXID相同,服务器ID大的胜出 ----------------名词解释----------------------- #1、SID: 服务器ID,用来标识一台Zookeeper集群中的机器,每台机器不能重复,和myid一致。 #2、ZXID 事务ID,ZXID是一个事务ID,用来标识一次服务器状态的变更,在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和Zookeeper服务器对于客户端“更新请求”的处理逻辑速度有关。 #2、Epoch 每个leader任期的代号,没有leader时选举方式跟第一次启动方法相同,每投完一次一票,这个数据就会增加。
5.3 总结
第一次启动选举机制:
主要需要看启动顺序,再看他的myid,只要选举票数超过半数,就会选举出一个leader。新加的机器都会指向这个leader。
非第一次启动选举机制
假如存在leader,新加入的机器会获取到leader的信息,然后进行连接。
假如没有leader,会先比较Epoch(任期数),再比较ZXID(事务ID),再比较SID(服务id)。
提供的服务包括:统一命令服务,统一配置管理,统一集群管理,服务节点动态上下线,软负载均衡等,
①统一命令服务:
在分布式环境下,经常需要对应用/服务进行统一命令,便于识别,例如:IP容易记住,而域名容易记住。
②统一配置管理:
分布式环境下,配置文件同步非常常见,一般要求一个集群中,所有节点的配置信息是一致的,比如kafka集群,对配置文件修改后,希望能快速同步到各个节点上。
配置管理可交由Zookeeper实现,可将配置信息写入Zookeeper上的zonde,各个客户端服务器监听这个znode,一旦znode中的数据被修改,zookeeper将通知各个客户端服务器。
③统一集群管理:
分布式环境中,时实掌握每个节点的状态是必要的,可根据节点时实状态制作出一些调整,
zookeeper可以实现时实监控节点状态变化,可将节点信息写入zookeeper上的Znode。监听这个Znode可以获取它的时实状态变化。
④服务动态上下线:
客户端能时实洞察到服务器上下线的变化。(是否宕机)
⑤软负载均衡:
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
1.1 环境架构
主机名 ip地址 安装软件 系统版本 node1 192.168.111.19 apache-zookeeper-3.5.7-bin.tar.gz centos7.5 node2 192.168.111.20 apache-zookeeper-3.5.7-bin.tar.gz centos7.5 node2 192.168.111.21 apache-zookeeper-3.5.7-bin.tar.gz centos7.5
三台全部关闭防火墙
#三台主机都需要执行,以node1演示 systemctl stop firewalld systemctl disable firewalld setenforce 0 hostnamectl set-hostname node{1,2,3}
安装环境
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel #这里yum安装JDK环境。方便。 java -version #获取软件包 cd /opt wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz #或者直接将软件包上传到/opt目录下。 tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
修改配置文件
cd /usr/local/zookeeper-3.5.7/conf cp zoo_sample.cfg zoo.cfg vim zoo.cfg tickTime=2000 #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数( tickTime的数量),这里表示为10*2s syncLimit=5 #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer dataDir=/usr/local/zookeeper-3.5.7/data #●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建 dataLogDir=/usr/local/zookeeper-3.5.7/1ogs #●添加,指定存放日志的目录,目录需要单独创建 clientPort=2181 #客户端连接端口 #添加集群信息 server.1=192.168.111.19:3188:3288 server.2=192.168.111.20:3188:3288 server.3=192.168.111.21:3188:3288 mkdir /usr/local/zookeeper-3.5.7/data mkdir /usr/local/zookeeper-3.5.7/logs
到这里就不要设置同步了,下面的操作,做好一台机器一台机器的配置。
echo 1 >/usr/local/zookeeper-3.5.7/data/myid # node1上配置 echo 2 >/usr/local/zookeeper-3.5.7/data/myid #node2上配置 echo 3 >/usr/local/zookeeper-3.5.7/data/myid #node3上配置 #//配置启动脚本,脚本在开启启动执行的目录中创建 vim /etc/init.d/zookeeper #!/bin/bash #chkconfig:2345 20 90 #description:Zookeeper Service Control Script ZK_HOME='/usr/local/zookeeper-3.5.7' case $1 in start) echo "----------zookeeper启动----------" $ZK_HOME/bin/zkServer.sh start ;; stop) echo "---------- zookeeper停止-----------" $ZK_HOME/bin/zkServer.sh stop ;; restart) echo "---------- zookeeper 重启------------" $ZK_HOME/bin/zkServer.sh restart ;; status) echo "---------- zookeeper 状态------------" $ZK_HOME/bin/zkServer.sh status ;; *) echo "Usage: $0 {start|stop|restart|status}" esac chmod +x /etc/init.d/zookeeper chkconfig --add zookeeper service zookeeper start service zookeeper status
192.168.111.21
192.168.111.19
192.168.111.20
启动node1
启动node2
启动node3
kafka是由Linkedin公司开发,是一个分布式,支持分区(partition)、多副本的(repilca),基于zookeeper协调的分布式消息系统。
它的最大的特性就是可以实时的处理大量数据以满足各种需求场景 ,比如:基于hadoop(分布式)的批处理系统,低延迟的实时系统,storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
kafka时一个分布式流式计算平台,但常用于消息系统使用。他是一个分布式消息队列
kafka的特性
高吞吐量、低延迟 : kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
可扩展性(分布式): kafka集群支持热扩展
持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性: 允许集群中节点失败
高并发: 支持数千个客户端同时读写。
1、使用kafka消息队列的好处
解耦
允许你独立的扩展或修改两边的处理过程,只要确保他们遵守同样的接口约束。
可恢复性
系统的一部分组件失效时,不会影响到整个系统,消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
灵活性和峰值处理能力
访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
四、kafka的场景应用和模式
日志收集: 一个公司可以用kafka手机各种服务的日志,通过kafka以统一接口服务的方式开放给各种consumer(消费者)。
消息系统: 解耦和生产者和消费者、缓存消息等。
用户活动跟踪: kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理
事件源
kafka的模式
kafka的模式不遵守JMS规范(消息队列的规范)。
1、点对点消息传递模式
(一对一,消费者主动拉取数据,消息收到后消息清除)
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。
但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。
该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。
2、发布订阅消息传递模式
在发布 – 订阅消息系统中,消息被持久化到一个 topic 中。
与点对点消息系统不同的是,消费者可以订阅一个或多个 topic,消费者可以消费该 topic 中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。
1、Producer(生产者):消息的生产者,是消息的入口
2、Broker(实例):一台kafka服务器就是一个broker,一个集群由多个broker组成。一个broker可以容纳多个topic(主题)
3、Topic(主题):消息的主题,可以理解成消息的分类,kafka获取到的数据就是按照不同的类型存储在不同的topic主题中。
topic主题中有很多的分区。
4、Partition(分区):
Topic的分区,每个Topic可以有很多个分区,分区作用是用来做负载均衡,提高kafka的吞吐量。
同一个Topic在不同的分区的数据是不重复的,分区的表现形式就是一个个文件夹。
分区的原因: ①方便在集群中扩展,②实现负载均衡的效果。③提高kafka的吞吐量。④实现高并发的效果。
5、Repica(副本):每一个分区都有多个副本,副本的作用就是备份数据。一个Topic的每个分区都有若干个副本,一个leader和若干个follower。
当主分区故障后,副本进行顶替它的位置。
6、leader(领导者):每个分区有多个副本,其中有且仅有一个作为leader,leader是当前负责数据读写的分区。
7、follower(追随者):follower跟随leader,所有请求都通过leader路由,数据变更会广播给所有follower,follower和leader保持数据同步,follower只负责备份,不负责数据的读写。
如果leader故障,则从follower中选举出一个新的leader。
当follower挂掉,卡主或者同步太慢,leader会把这个follower从集群列表中删除,重新创建一个follower。
#实例和主题和分区和副本的理解。 1、实例(broker)就是一台服务器,装好kafka的服务器。多个实例组成一个kafka集群 2、主题(topic)是存储的类型,将不同生产者生成的数据按照类型存储。理解成一个逻辑概念,下面有很多的分区(partition),分区才是用来存储具体数据的,分区也会称为leader。每一个分区都会进行生成一个或多个副本(Repica),用来备份分区的数据,也会被称为follower。 一个Topic会产生多个分区Partition,分区中分为Leader和Follower,消息一般发送到Leader,Follower通过数据的同步与Leader保持同步,消费的话也是在Leader中发生消费,如果多个消费者,则分别消费Leader和各个Follower中的消息,当Leader发生故障的时候,某个Follower会成为主节点,此时会对齐消息的偏移量。
8、Message(消息):消息的实体
9、Consumer:消费者,消息的出口
10、Consumer Group:
多个消费者组成一个消费者组,在kafka的设计中,同一个分区的数据只能被消息者组中的某一个消费者消费,同一个的消费组的消费者可以消费同一个主题中的不同分区的数据。
11、offset偏移量:
可以唯一的标识一条消息
偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息(即消费位置)。
消费被消费之后,并不会被删除,这样多个业务就可以重复使用kafka的消息。
某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
消息最终还是会被删除,默认生命周期为1周(168小时)。
12、zookeeper:kafka集群依赖zookeeper来存储meta(变化)信息。
#offset和zookeeper理解 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要时实记录自己消费到了哪个offset,以便恢复后继续消费。 kafka 在0.9 版本之前,consumer默认将offset保存在zookeeper中。 从0.9版本后,consumer默认将offset保存在kafka一个内置的topic中,该topic为_consumer_offset. 也就是说,zookeeper的作用就是,生产者输出数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是通过zookeeper去寻找的,消费者消费到哪一条数据,也需要zookeeper的支持,从zookeeper获得offset,offset记录上一次消费的数据消费到哪里,这样就可以接着下一跳数据进行消费。
1、生产者先获取分区中的leader。
2、Producter将消息发送给leader
3、Leader将消息写入本机文件
4、Follower从leader同步消息(follower主动去leader进行同步)
5、Follower将消息写入本地后向leader发送ACK确认消息。
6、leader收到所有副本的ACK后,向生产者发送ACK确认消息。
1、分区的原因
便于在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了。
可以提高并发,因为可以以分区为单位读写。
2、分区的目的
生产者采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据时有序的。
数据会写入到不同的分区。为什么进行分区?
**1、方便扩展:**因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据。
2、提高并发:以分区为单位进行读写数据,提高消费的处理效率。
类似于负载均衡,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
prtition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
如果既没指定partition,又没有设置key,则会轮询选出一个partition。
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。
1、0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
2、1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
3、all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
效率最高。
2、1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
3、all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
更多推荐
所有评论(0)