Linux Kafka 2.11-1.1.1 安装搭建
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
安装流程
1、配置环境
配置Zookeeper环境参考:https://blog.csdn.net/wangjianan7357/article/details/81176580
2、下载
从官网下载一个Kafka稳定版本,这里采用的是Kafka 2.11-1.1.1版本 http://kafka.apache.org/downloads
解压文件:tar -zxvf kafka_2.11-1.1.1.tgz
3、配置
在config/server.properties文件中
broker.id=0
port=9092 #端口号
host.name=127.0.0.1 #服务器IP地址,修改为自己的服务器IP
其他配置参考:
参数 | 说明(解释) |
broker.id =0 | 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况 |
log.dirs=/data/kafka-logs | kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 /data/kafka-logs-1,/data/kafka-logs-2 |
port =9092 | broker server服务端口 |
message.max.bytes =6525000 | 表示消息体的最大大小,单位是字节 |
num.network.threads =4 | broker处理消息的最大线程数,一般情况下数量为cpu核数 |
num.io.threads =8 | broker处理磁盘IO的线程数,数值为cpu核数2倍 |
background.threads =4 | 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改 |
queued.max.requests =500 | 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。 |
host.name | broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置 |
socket.send.buffer.bytes=100*1024 | socket的发送缓冲区,socket的调优参数SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 | socket的接受缓冲区,socket的调优参数SO_RCVBUFF |
socket.request.max.bytes =100*1024*1024 | socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 |
delete.topic.enable=true | 是否彻底删除Topic |
在bin/kafka-server-start.sh文件中,设置服务器可用内存大小,内存不足时,启动会报:error='Cannot allocate memory'
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
4、启动
执行脚本 ./bin/kafka-server-start.sh config/server.properties &
显示以下信息表示启动正常
5、停止
执行脚本 ./bin/kafka-server-stop.sh
6、创建Topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
7、列出所有Topic
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
8、删除Topic
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test
如果没有在配置里设置彻底删除Topic,此处则只是将该Topic标志为删除
9、发送数据
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
// 输入内容
> 111111
> 222222
> 333333
> 444444
10、消费数据
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
// 输出内容
111111
222222
333333
444444
更多推荐
所有评论(0)