zookeeper集群+kafka集群

zookeeper是一个开源的,分布式的,为分布式架构提供协调服务的APACHE的项目

保存元数据。

zookeeper:

zookeeper的工作机制:

观察者模式设计的分布式服务器管理架构

负责存储和管理元数据,记录集群的变化。保存集群变化的信息。

zookeeper的特点:

1、在集群中分为领导者和追随者,组成的集群。

2、只要有半数以上的节点正常工作,整个zookeeper就可以正常工作。zookeeper在部署时一般选择奇数台。

3、全局的数据是一致,每个zookeeper不论是领导者还是追随者,在访问他们的数据时都是一致的。

4、数据更新的原子性,一次更新数据,要么都成功,要么都失败

5、数据更新的实时性

6、领导者和追随者根据投票产生。

选举机制:

A B C

1、服务器A启动,发起一次选举,A会投自己一票。A有一票,不够半数。选举无法完成,A进入looking

2、服务器B投票,再发起一次选举,服务器B也投自己一票,服务器A和B会做一个比较,比较myid,谁的myid大,如果A比B小,A会把票改投给B,2票,B自动当选为leader。

3、C启动,自动成为追随者,A也会成为追随者。

tickTIme=2000
#通信心跳时间,zookeeper服务端和客户端之间通信的间隔时间,单位是毫秒
initLimit=10
#leader和follower初始连接时,最多能容忍的心跳数。秒
syncLimit=5
#leader和follower之间同步通信的超时时间。如果5*2时间内发生超时,leader就认为follower死了,会从集群中将其删除
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs
clinetPort=2181
server.1=192.168.60.91:3188:3288
server.2=192.168.60.92:3188:3288
server.3=192.168.60.93:3188:3288
#server.1   数字id,也就是服务器对应的myid
#192.168.60.91  服务器的IP地址
#3188   zookeeper集群内部通信的端口
#3288   重新选举端口,万一leader挂了,用这个端口进行内部通信,选举新的leader。
server.1=192.168.60.91:3188:3288
server.2=192.168.60.92:3188:3288
server.3=192.168.60.93:3188:3288

server.1 数字id,也就是服务器对应的myid

192.168.60.91 服务器的IP地址

3188 zookeeper集群内部通信的端口 3288 重新选举端口,万一leader挂了,用这个端口进行内部通信,选举新的leader。

zookeeper安装步骤:

安装 Zookeeper
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper
​
//修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
​
vim zoo.cfg
​
tickTime=2000   #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10    #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5     #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,
                 并从服务器列表中删除Follwer
dataDir=/opt/zookeeper/data      ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/opt/zookeeper/logs   ●添加,指定存放日志的目录,目录需要单独创建
clientPort=2181   #客户端连接端口
#添加集群信息
server.1=192.168.60.91:3188:3288
server.2=192.168.60.92:3188:3288
server.3=192.168.60.93:3188:3288
​
//在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs
​
//在每个节点的dataDir指定的目录下创建一个 myid 的文件
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid
​
//配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
    echo "---------- zookeeper 启动 ------------"
    $ZK_HOME/bin/zkServer.sh start
;;
stop)
    echo "---------- zookeeper 停止 ------------"
    $ZK_HOME/bin/zkServer.sh stop
;;
restart)
    echo "---------- zookeeper 重启 ------------"
    $ZK_HOME/bin/zkServer.sh restart
;;
status)
    echo "---------- zookeeper 状态 ------------"
    $ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac
​

kafka:

kafka概述:

消息队列:MQ

在高并发环境下,同步的请求来不及处理,请求太多会造成阻塞。

比如说大量请求并发到数据库,too many connection报错。

消息队列,使用异步处理方式,可以缓解系统处理请求的压力。

kafka的作用:

1、异步处理

2、系统解耦

每个系统之间独立运行,互相之间没有必然的依赖关系。

主要是在微服务架构中,微服务架构中的通信对解耦来说至关重要。

各个微服务之间独立运行,分布处理各自的请求和消息。提高整个系统的吞吐量和处理能力。

电商的订单系统。网站的工单系统,典型的一个消息队列场景。

3、负责均衡

消息队列的负载均衡:把任务发送到多个消费者,多个消费者可以并行处理队列中的消息。

情况1:

情况2:

4、流量控制和限流

通过延迟方式,处理生产速率和消费者的处理速度(代码控制)。

5、数据同步和分发。

跨系统的数据同步和日志收集。

6、任务调度和定时任务

7、实时数据处理

8、备份和恢复

消息队列的模式:

1、点对点(一对一)消费者消费完数据之后,生产者会自动清除已消费的数据。

一个生产者对应一个消费者(淘汰)

2、发布/订阅模式(一对多/观察者模式,消费者数据在消费完之后不会被清除(保留一段时间))

生产者发布一个消息,可以是一个消费者使用,也可以是多个消费者同时使用。(主流)

kafka就是发布/订阅模式的消息队列。RAbbitMQ也是发布/订阅模式的消息队列,小集群内部使用。

大数据的实时处理领域。

kafka的特性:

高吞吐量,低延迟

每秒可以处理几十万条数据,延迟只有几毫秒

集群的可扩展性(热扩展)

消息的持久化:生产者发布的消息可以保存到磁盘当中,防止数据丢失(有时间限制)

容错性:挂了一个也可以继续使用

高并发:数千个客户端可以同时读写。

kafka的组件:

1、topic 主题,kafka的基本单元,所有生产者发布的消息都是发到主题。

消费者订阅主题,然后消费者生产者发布消息。

生产者 生产者把消息发布到主题

消费者 订阅主题,消费生产者发布的消息。

分区:每个主题都可以分成多个分区。每个分区都是数据的有序子集。

分区当中保留数据,按照偏移量来有序的存储数据。消费者可以根据偏移量来消费指定分区当中的消息(一般不用)

偏移量:消息在分区当中的唯一标识,跟踪和定位消息所在的位置。消费者可以根据偏移量来处理信息。

分区还有备份的作用,我们在创建主题时创建分区,创建分区时要指定副本数。

分区和我们执行的集群机器数量一般是保持一致的。

副本:备份分区中的消息。最少要2个,互为备份。

经纪人 broker 经纪人处理生产者和消费者的请求(kafka)元数据(zookeeper)

zookeeper:保存元数据

kafka的工作流程:

生产者将消息发布到指定的主题,每个消息都附带一个key和value

主题是有多个分区的,生产者把消息写入一个分区(带偏移量)

经纪人(kafka):分配和处理生产者的发布请求,偏移量也是经纪人分配(在分区中是唯一的)。

消费者订阅主题,获取全量的消费者的消费信息(默认模式),也可以从指定的分区获取消息(代码来完成,一般不用)

生产者发布的消息会在本地保留一段时间,防止消费者有延迟或者处理速度过慢,导致没有成功消费。保留时间:7天(默认的)

kafka-topics.sh --create --bootstrap-server 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --replication-factor 2 --partitions 3 --topic test1
--replication-factor 2      创建分区的副本数
--partitions 3              分区数
--topic test1               指定主题的名称
​

topic:主题名称

Partition:分区 偏移量

leader:分区的领导者,用来处理分区的读写操作。只有在指定写分区和指定读分区时才工作。如果不是指定,全量展示,无意义。

Replicas:副本,0 1 2 ---------broker id

ISR:表示与当前领导同步的副本 0 1 2

安装kafka操作步骤:

#在三台机器上把kafka_2.13-3.4.1.tgz导入opt目录下
[root@mysql1 opt]# rz -E
#解压压缩包
[root@mysql1 opt]# tar -xf kafka_2.13-3.4.1.tgz
#改个名字
[root@mysql1 opt]# mv kafka_2.13-3.4.1 /usr/local/kafka
#进入config目录下
[root@mysql1 opt]# cd /usr/local/kafka/config/
#将kafka配置文件备份
[root@mysql1 config]# cp server.properties server.properties.bak
#更改kafka的配置文件
[root@mysql1 config]# vim server.properties
 24 broker.id=0             #每天机子的id都要不一样
 34 listeners=PLAINTEXT://192.168.60.91:9092
 44 num.network.threads=3
    #broker处理网络请求的线程数,一般不用修改
 47 num.io.threads=8
    #这个的数值一定要大于磁盘数,处理磁盘读写的线程数
 50 socket.send.buffer.bytes=102400
    #发送套接字缓冲区的大小
 53 socket.receive.buffer.bytes=102400
    #接收套接字缓冲区的大小
 56 socket.request.max.bytes=104857600
    #请求套接字的缓冲区的大小
 62 log.dirs=/usr/local/kafka/logs          #日志文件的位置
 67 num.partitions=1
    #创建主题时,经纪人指定的分区数,如果指定的分区不同,这个值可以被覆
    盖
 105 log.retention.hours=168
     #消息在本地持久化保留的时间,168小时
 125 zookeeper.connect=192.168.60.91:2181,192.168.60.92:2181,192.168.60.93:2181
 #申明变量路径:(三台机子都要操作)
[root@mysql1 config]# vim /etc/profile          #在该配置文件的最后一行加入下面两行
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@mysql1 config]# source /etc/profile       #让该配置文件立即生效
[root@mysql1 config]# vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
    echo "---------- Kafka 启动 ------------"
    ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
    echo "---------- Kafka 停止 ------------"
    ${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
    $0 stop
    $0 start
;;
status)
    echo "---------- Kafka 状态 ------------"
    count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac
#设置该配置文件的执行权限和开机自启
[root@mysql1 config]# chmod +x /etc/init.d/kafka    
[root@mysql1 config]# chkconfig --add kafka
#启动kafka服务并检查端口是否启动,kafka有可能假启动
[root@mysql2 config]# service kafka start
---------- Kafka 启动 ------------
[root@mysql2 config]# netstat -antp | grep 9092
tcp6       0      0 192.168.60.92:9092      :::*                    LISTEN      28467/java                    
创建主题:
kafka-topics.sh --create --bootstrap-server 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --replication-factor 2 --partitions 3 --topic test1
#2:副本数          3:服务器数      test1:主题名
​
生产者生产消息命令
[root@mysql1 config]# kafka-console-producer.sh --broker-list 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --topic test1
>test2   
>test3
>test4
​
消费者消费生产者生产的消息
[root@mysql2 config]# kafka-console-consumer.sh --bootstrap-server 192.168.60.91:9092,192.168.60.92:9092,192.168.60.93:9092 --topic test1 --from-beginning
test2
test3
test4

总结:

zookeeper 就是保存集群的元数据

kafka的工作流程

组件的作用

kafka的消息堆积该如何解决:

消费者出现了延迟或者处理能力太差,导致消息堆积。

1、减少kafka持久化的保存时间

2、修改主题的分区数,扩大分区的数量,提高消费者获取的通道

3、可以指定多个消费者共同工作,处理消息的积压

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐