
消息队列KafKa的集群部署
KafKa的集群部署
🍅程序员小王的博客:程序员小王的博客
🍅 欢迎点赞 👍 收藏 ⭐留言 📝
🍅 如有编辑错误联系作者,如果有比较好的文章欢迎分享给我,我会取其精华去其糟粕
🍅java自学的学习路线:java自学的学习路线
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
一、消息队列
1、为什么要有消息队列
2、什么是消息队列
MQ(Message Quene) : 翻译为 消息队列
,通过典型的 生产者和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
-
消息 Message
网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。
-
队列 Queue
一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素(FIFO)。入队、出队。
-
消息队列 MQ
消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。
3、消息中间件的分类
当今市面上有很多主流的消息中间件,如老牌的 ActiveMQ 、 RabbitMQ ,炙手可热的Kafka ,阿里巴巴自主开发 RocketMQ 等。
-
Redis 基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,Redis对短消息(小于10kb)的性能比RabbitMQ好,长消息性能比RabbitMQ差
-
Kafka/Jafka 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
-
RabbitMQ Erlang编写,支持多协议AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式。
RabbitMQ前期我已经写过博客:
🍅基础版本:RabbitMQ安装以及消息模型使用攻略
🍅SpringBoot版本:SpringBoot中使用RabbitMQ
4、消息队列的分类
MQ主要分为两类:点对点(p2p),发布订阅(Pub/Sub
)
-
Peer-to-Peer 一般基于Pull或者Polling接收数据 发送到队列中的消息被一个而且仅仅一个接收者所接受,即使有多个接收者在同一个队列中侦听同一消息 即支持异步“即发即收”的消息传递方式,也支持同步请求/应答传送方式
-
发布订阅 发布到同一个主题的消息,可被多个订阅者所接收 发布/订阅即可基于Push消费数据,也可基于Pull或者Polling消费数据 解耦能力比P2P模型更强
5、p2p和发布订阅MQ的比较
-
共同点
消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。
-
不同点
1. P2P模型包括:消息队列(Queue),发送者(Sender)、接收者(Recevier)
一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中)。比如说打电话。
2.pub/Sub包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)
每个消息可以有多个消费者,彼此互不影响。比如我发布一个微博:关注我的人都能够看到。
6、消息队列的使用场景
-
解耦 :各系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在
-
**冗余 **:部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
-
扩展 :消息系统是统一的数据接口,各系统可独立扩展
-
峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
-
**可恢复性 **系统中部分键失效并不会影响整个系统,它恢复会仍然可从消息系统中获取并处理数据
-
**异步通信 **在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理
二、KafKa简介
1、什么是KafKa
Kafka是分布式的发布——订阅消息系统,它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级项目。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃live的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。
三大特点:
-
高吞吐量
可以满足每秒百万级别消息的生产和消费——生产消费。
-
持久性
有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。
-
分布式
基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体
-
健壮性
2、设计目标
-
高吞吐率 在廉价的商用机器上单机可支持每秒100万条消息的读写
-
消息持久化 所有消息均被持久化到磁盘,无消息丢失,支持消息重放
-
完全分布式 Producer,Broker,Consumer均支持水平扩展
-
同时适应在线流处理和离线批处理
3、Kafka核心的概念
(1)一个消息队列需要哪些部分呢?
生产、消费、消息类别、存储等等。 对于kafka而言,kafka服务就像是一个大的水池。不
断的生产、存储、消费着各种类别的消息。
(2)那么kafka由何组成呢?
-
Kafka服务:
1. Topic:主题,Kafka处理的消息的不同分类。
2. Broker:消息服务器代理,Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬
盘中。每个topic都是有分区的。
3. Partition:Topic物理上的分组(part划分),一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候
指定。
4. Message:消息,是通信的基本单位,每个消息都属于一个partition
-
Kafka服务相关:
1. Producer:消息和数据的生产者,向Kafka的一个topic发布消息。
2. Consumer:消息和数据的消费者,定于topic并处理其发布的消息。
3. Zookeeper:协调kafka的正常运行。
三、KafKa的分布式安装(部署三台)
IP | Kafka | Zookeeper | Jdk | Scala |
---|---|---|---|---|
192.168.5.128 | Kafka | Zookeeper | Jdk | Scala |
192.168.5.130 | Kafka | Zookeeper | Jdk | Scala |
192.168.5.131 | Kafka | Zookeeper | Jdk | Scala |
1、版本下载
-
安装包:http://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
-
源码包:http://archive.apache.org/dist/kafka/1.1.1/kafka-1.1.1-src.tgz
2、安装Kafka的相关前提
(1)安装jdk
-
前提:由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。
yum install java-1.8.0-openjdk* -y
(2)安装Zookeeper
-
kafka之前的版本依赖zookeeper,所以需要先安装zookeeper,
现在kafka最新版本不在依赖于zk了!!!
但是我们目前的kafka是需要zookeeper的 -
将某台的zookper移动到另外一台linux上
scp -r /usr/soft/apache-zookeeper-3.6.3/ 192.168.5.130:/usr/soft/
-
集群配置
dataDir=/usr/soft/apache-zookeeper-3.6.3/data
dataLogDir=/usr/soft/apache-zookeeper-3.6.3/log
server.1=192.168.5.128:2888:3888
server.2=192.168.5.130:2888:3888
server.3=192.168.5.131:2888:3888
3、安装过程
-
从官网下载一个Kafka稳定版本,这里采用的是Kafka 2.11-1.1.1版本 Apache Kafka
-
解压文件:tar -zxvf kafka_2.11-1.1.1.tgz
(1)解压
[root@localhost soft]# tar -zxvf kafka_2.11-1.1.1.tgz -C /usr/apps/
(2)重命名
mv kafka_2.11-1.1.1 /usr/apps/kafka
(3)添加环境变量(这个文件最初是没有的)
-
在
/etc/profile.d/
中添加 my_env.sh文件, 此sh文件名称随意噢
vim /etc/profile.d/hadoop-etc.sh
export KAFKA_HOME=/usr/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
-
更新
source /etc/profile.d/hadoop-etc.sh
(4)配置
-
修改/usr/apps/kafka/config/server.properties
vim /usr/apps/kafka/config/server.properties
## 当前kafka实例的id,必须为整数,一个集群中不可重复
broker.id=0
## 生产到kafka中的数据存储的目录,目录需要手动创建
log.dirs=/usr/apps/data/kafka
## kafka数据在zk中的存储目录
zookeeper.connect=1192.168.5.128:2181
#设置zookeeper的连接端口
zookeeper.connect=192.168.5.128:2181,192.168.5.130:2181,192.168.5.131:2181
#设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000
-
在bin/kafka-server-start.sh文件中,设置服务器可用内存大小,内存不足时,启动会报:error='Cannot allocate memory'
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
(5)分发安装包到另外两个虚拟机
scp -r /usr/apps/kafka/ 192.168.5.130:/usr/apps/kafka
scp -r /usr/apps/kafka/ 192.168.5.131:/usr/apps/kafka
-
修改192.168.5.130,192.168.5.131节点kafka配置文件conf/server.properties里面的broker.id和listeners的值分别为去1,2
ip | broker.id | listeners | zookper |
---|---|---|---|
192.168.5.128 | broker.id=0 | listeners=PLAINTEXT://192.168.5.128:9092 | zookeeper.connect=192.168.5.128:2181, 192.168.5.130:2181, 192.168.5.131:2181 |
192.168.5.130 | broker.id=1 | listeners=PLAINTEXT://192.168.5.130:9092 | zookeeper.connect=192.168.5.128:2181, 192.168.5.130:2181, 192.168.5.131:2181 |
192.168.5.130 | broker.id=2 | listeners=PLAINTEXT://192.168.5.131:9092 | zookeeper.connect=192.168.5.128:2181, 192.168.5.130:2181, 192.168.5.131:2181 |
四、Kafka集群启动
1、执行脚本启动zookeeper
-
所有zookeeper节点都需要执行
zkServer.sh start
-
显示以下信息表示启动正常
2、后台启动Kafka集群服务
-
所有Kafka节点都需要执行
[root@localhost kafka]# ./bin/kafka-server-start.sh config/server.properties &
-
启动成功
更多推荐
所有评论(0)