zookeeper集群+kafka集群
消息队列:MQ在高并发环境下,同步的请求来不及处理,请求太多会造成阻塞。比如说大量请求并发到数据库,too many connection报错。消息队列,使用异步处理方式,可以缓解系统处理请求的压力。zookeeper 就是保存集群的元数据kafka的工作流程组件的作用消费者出现了延迟或者处理能力太差,导致消息堆积。1、减少kafka持久化的保存时间2、修改主题的分区数,扩大分区的数量,提高消费者
zookeeper集群+kafka集群
zookeeper是一个开源的,分布式的,为分布式架构提供协调服务的APACHE的项目
保存元数据。
zookeeper:
zookeeper的工作机制:
观察者模式设计的分布式服务器管理架构
负责存储和管理元数据,记录集群的变化。保存集群变化的信息。
zookeeper的特点:
1、在集群中分为领导者和追随者,组成的集群。
2、只要有半数以上的节点正常工作,整个zookeeper就可以正常工作。zookeeper在部署时一般选择奇数台。
3、全局的数据是一致,每个zookeeper不论是领导者还是追随者,在访问他们的数据时都是一致的。
4、数据更新的原子性,一次更新数据,要么都成功,要么都失败
5、数据更新的实时性
6、领导者和追随者根据投票产生。
选举机制:
A B C
1、服务器A启动,发起一次选举,A会投自己一票。A有一票,不够半数。选举无法完成,A进入looking
2、服务器B投票,再发起一次选举,服务器B也投自己一票,服务器A和B会做一个比较,比较myid,谁的myid大,如果A比B小,A会把票改投给B,2票,B自动当选为leader。
3、C启动,自动成为追随者,A也会成为追随者。
tickTIme=2000 #通信心跳时间,zookeeper服务端和客户端之间通信的间隔时间,单位是毫秒 initLimit=10 #leader和follower初始连接时,最多能容忍的心跳数。秒 syncLimit=5 #leader和follower之间同步通信的超时时间。如果5*2时间内发生超时,leader就认为follower死了,会从集群中将其删除 dataDir=/opt/zookeeper/data dataLogDir=/opt/zookeeper/logs clinetPort=2181 server.1=192.168.60.91:3188:3288 server.2=192.168.60.92:3188:3288 server.3=192.168.60.93:3188:3288 #server.1 数字id,也就是服务器对应的myid #192.168.60.91 服务器的IP地址 #3188 zookeeper集群内部通信的端口 #3288 重新选举端口,万一leader挂了,用这个端口进行内部通信,选举新的leader。
server.1=192.168.60.91:3188:3288 server.2=192.168.60.92:3188:3288 server.3=192.168.60.93:3188:3288
server.1 数字id,也就是服务器对应的myid
192.168.60.91 服务器的IP地址
3188 zookeeper集群内部通信的端口 3288 重新选举端口,万一leader挂了,用这个端口进行内部通信,选举新的leader。
zookeeper安装步骤:
安装 Zookeeper cd /opt tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz mv apache-zookeeper-3.5.7-bin /opt/zookeeper //修改配置文件 cd /opt/zookeeper/conf/ cp zoo_sample.cfg zoo.cfg vim zoo.cfg tickTime=2000 #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s syncLimit=5 #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉, 并从服务器列表中删除Follwer dataDir=/opt/zookeeper/data ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建 dataLogDir=/opt/zookeeper/logs ●添加,指定存放日志的目录,目录需要单独创建 clientPort=2181 #客户端连接端口 #添加集群信息 server.1=192.168.60.91:3188:3288 server.2=192.168.60.92:3188:3288 server.3=192.168.60.93:3188:3288 //在每个节点上创建数据目录和日志目录 mkdir /opt/zookeeper/data mkdir /opt/zookeeper/logs //在每个节点的dataDir指定的目录下创建一个 myid 的文件 echo 1 > /opt/zookeeper/data/myid echo 2 > /opt/zookeeper/data/myid echo 3 > /opt/zookeeper/data/myid //配置 Zookeeper 启动脚本 vim /etc/init.d/zookeeper #!/bin/bash #chkconfig:2345 20 90 #description:Zookeeper Service Control Script ZK_HOME='/opt/zookeeper' case $1 in start) echo "---------- zookeeper 启动 ------------" $ZK_HOME/bin/zkServer.sh start ;; stop) echo "---------- zookeeper 停止 ------------" $ZK_HOME/bin/zkServer.sh stop ;; restart) echo "---------- zookeeper 重启 ------------" $ZK_HOME/bin/zkServer.sh restart ;; status) echo "---------- zookeeper 状态 ------------" $ZK_HOME/bin/zkServer.sh status ;; *) echo "Usage: $0 {start|stop|restart|status}" esac
kafka:
kafka概述:
消息队列:MQ
在高并发环境下,同步的请求来不及处理,请求太多会造成阻塞。
比如说大量请求并发到数据库,too many connection报错。
消息队列,使用异步处理方式,可以缓解系统处理请求的压力。
kafka的作用:
1、异步处理
2、系统解耦
每个系统之间独立运行,互相之间没有必然的依赖关系。
主要是在微服务架构中,微服务架构中的通信对解耦来说至关重要。
各个微服务之间独立运行,分布处理各自的请求和消息。提高整个系统的吞吐量和处理能力。
电商的订单系统。网站的工单系统,典型的一个消息队列场景。
3、负责均衡
消息队列的负载均衡:把任务发送到多个消费者,多个消费者可以并行处理队列中的消息。
情况1:
情况2:
4、流量控制和限流
通过延迟方式,处理生产速率和消费者的处理速度(代码控制)。
5、数据同步和分发。
跨系统的数据同步和日志收集。
6、任务调度和定时任务
7、实时数据处理
8、备份和恢复
消息队列的模式:
1、点对点(一对一)消费者消费完数据之后,生产者会自动清除已消费的数据。
一个生产者对应一个消费者(淘汰)
2、发布/订阅模式(一对多/观察者模式,消费者数据在消费完之后不会被清除(保留一段时间))
生产者发布一个消息,可以是一个消费者使用,也可以是多个消费者同时使用。(主流)
kafka就是发布/订阅模式的消息队列。RAbbitMQ也是发布/订阅模式的消息队列,小集群内部使用。
大数据的实时处理领域。
kafka的特性:
高吞吐量,低延迟
每秒可以处理几十万条数据,延迟只有几毫秒
集群的可扩展性(热扩展)
消息的持久化:生产者发布的消息可以保存到磁盘当中,防止数据丢失(有时间限制)
容错性:挂了一个也可以继续使用
高并发:数千个客户端可以同时读写。
kafka的组件:
1、topic 主题,kafka的基本单元,所有生产者发布的消息都是发到主题。
消费者订阅主题,然后消费者生产者发布消息。
生产者 生产者把消息发布到主题
消费者 订阅主题,消费生产者发布的消息。
分区:每个主题都可以分成多个分区。每个分区都是数据的有序子集。
分区当中保留数据,按照偏移量来有序的存储数据。消费者可以根据偏移量来消费指定分区当中的消息(一般不用)
偏移量:消息在分区当中的唯一标识,跟踪和定位消息所在的位置。消费者可以根据偏移量来处理信息。
分区还有备份的作用,我们在创建主题时创建分区,创建分区时要指定副本数。
分区和我们执行的集群机器数量一般是保持一致的。
副本:备份分区中的消息。最少要2个,互为备份。
经纪人 broker 经纪人处理生产者和消费者的请求(kafka)元数据(zookeeper)
zookeeper:保存元数据
kafka的工作流程:
生产者将消息发布到指定的主题,每个消息都附带一个key和value
主题是有多个分区的,生产者把消息写入一个分区(带偏移量)
经纪人(kafka):分配和处理生产者的发布请求,偏移量也是经纪人分配(在分区中是唯一的)。
消费者订阅主题,获取全量的消费者的消费信息(默认模式),也可以从指定的分区获取消息(代码来完成,一般不用)
生产者发布的消息会在本地保留一段时间,防止消费者有延迟或者处理速度过慢,导致没有成功消费。保留时间:7天(默认的)
kafka-topics.sh --create --bootstrap-server 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --replication-factor 2 --partitions 3 --topic test1
--replication-factor 2 创建分区的副本数 --partitions 3 分区数 --topic test1 指定主题的名称
topic:主题名称
Partition:分区 偏移量
leader:分区的领导者,用来处理分区的读写操作。只有在指定写分区和指定读分区时才工作。如果不是指定,全量展示,无意义。
Replicas:副本,0 1 2 ---------broker id
ISR:表示与当前领导同步的副本 0 1 2
安装kafka操作步骤:
#在三台机器上把kafka_2.13-3.4.1.tgz导入opt目录下 [root@mysql1 opt]# rz -E #解压压缩包 [root@mysql1 opt]# tar -xf kafka_2.13-3.4.1.tgz #改个名字 [root@mysql1 opt]# mv kafka_2.13-3.4.1 /usr/local/kafka #进入config目录下 [root@mysql1 opt]# cd /usr/local/kafka/config/ #将kafka配置文件备份 [root@mysql1 config]# cp server.properties server.properties.bak #更改kafka的配置文件 [root@mysql1 config]# vim server.properties 24 broker.id=0 #每天机子的id都要不一样 34 listeners=PLAINTEXT://192.168.60.91:9092 44 num.network.threads=3 #broker处理网络请求的线程数,一般不用修改 47 num.io.threads=8 #这个的数值一定要大于磁盘数,处理磁盘读写的线程数 50 socket.send.buffer.bytes=102400 #发送套接字缓冲区的大小 53 socket.receive.buffer.bytes=102400 #接收套接字缓冲区的大小 56 socket.request.max.bytes=104857600 #请求套接字的缓冲区的大小 62 log.dirs=/usr/local/kafka/logs #日志文件的位置 67 num.partitions=1 #创建主题时,经纪人指定的分区数,如果指定的分区不同,这个值可以被覆 盖 105 log.retention.hours=168 #消息在本地持久化保留的时间,168小时 125 zookeeper.connect=192.168.60.91:2181,192.168.60.92:2181,192.168.60.93:2181 #申明变量路径:(三台机子都要操作) [root@mysql1 config]# vim /etc/profile #在该配置文件的最后一行加入下面两行 export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin [root@mysql1 config]# source /etc/profile #让该配置文件立即生效 [root@mysql1 config]# vim /etc/init.d/kafka #!/bin/bash #chkconfig:2345 22 88 #description:Kafka Service Control Script KAFKA_HOME='/usr/local/kafka' case $1 in start) echo "---------- Kafka 启动 ------------" ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties ;; stop) echo "---------- Kafka 停止 ------------" ${KAFKA_HOME}/bin/kafka-server-stop.sh ;; restart) $0 stop $0 start ;; status) echo "---------- Kafka 状态 ------------" count=$(ps -ef | grep kafka | egrep -cv "grep|$$") if [ "$count" -eq 0 ];then echo "kafka is not running" else echo "kafka is running" fi ;; *) echo "Usage: $0 {start|stop|restart|status}" esac #设置该配置文件的执行权限和开机自启 [root@mysql1 config]# chmod +x /etc/init.d/kafka [root@mysql1 config]# chkconfig --add kafka #启动kafka服务并检查端口是否启动,kafka有可能假启动 [root@mysql2 config]# service kafka start ---------- Kafka 启动 ------------ [root@mysql2 config]# netstat -antp | grep 9092 tcp6 0 0 192.168.60.92:9092 :::* LISTEN 28467/java
创建主题: kafka-topics.sh --create --bootstrap-server 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --replication-factor 2 --partitions 3 --topic test1 #2:副本数 3:服务器数 test1:主题名 生产者生产消息命令 [root@mysql1 config]# kafka-console-producer.sh --broker-list 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --topic test1 >test2 >test3 >test4 消费者消费生产者生产的消息 [root@mysql2 config]# kafka-console-consumer.sh --bootstrap-server 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --topic test1 --from-beginning test2 test3 test4
总结:
zookeeper 就是保存集群的元数据
kafka的工作流程
组件的作用
kafka的消息堆积该如何解决:
消费者出现了延迟或者处理能力太差,导致消息堆积。
1、减少kafka持久化的保存时间
2、修改主题的分区数,扩大分区的数量,提高消费者获取的通道
3、可以指定多个消费者共同工作,处理消息的积压
更多推荐
所有评论(0)