Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

Kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外Kafka集群有多个Kafka实例组成,每个实例(server)成为broker。无论是Kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

安装流程

1、配置环境

配置Zookeeper环境参考:https://blog.csdn.net/wangjianan7357/article/details/81176580

2、下载

从官网下载一个Kafka稳定版本,这里采用的是Kafka 2.11-1.1.1版本 http://kafka.apache.org/downloads

解压文件:tar -zxvf kafka_2.11-1.1.1.tgz

3、配置

在config/server.properties文件中

broker.id=0
port=9092 #端口号
host.name=127.0.0.1 #服务器IP地址,修改为自己的服务器IP

其他配置参考:

参数

说明(解释)

broker.id =0

每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况

log.dirs=/data/kafka-logs

kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1,/data/kafka-logs-2

port =9092

broker server服务端口

message.max.bytes =6525000

表示消息体的最大大小,单位是字节

num.network.threads =4

broker处理消息的最大线程数,一般情况下数量为cpu核数

num.io.threads =8

broker处理磁盘IO的线程数,数值为cpu核数2倍

background.threads =4

一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

queued.max.requests =500

等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。

host.name

broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

socket.send.buffer.bytes=100*1024

socket的发送缓冲区,socket的调优参数SO_SNDBUFF

socket.receive.buffer.bytes =100*1024

socket的接受缓冲区,socket的调优参数SO_RCVBUFF

socket.request.max.bytes =100*1024*1024

socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖

delete.topic.enable=true是否彻底删除Topic

在bin/kafka-server-start.sh文件中,设置服务器可用内存大小,内存不足时,启动会报:error='Cannot allocate memory'

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

4、启动

执行脚本 ./bin/kafka-server-start.sh config/server.properties &

显示以下信息表示启动正常 

5、停止

执行脚本 ./bin/kafka-server-stop.sh

6、创建Topic

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

7、列出所有Topic

bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

8、删除Topic

bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test

如果没有在配置里设置彻底删除Topic,此处则只是将该Topic标志为删除

9、发送数据

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
// 输入内容
> 111111
> 222222
> 333333
> 444444

10、消费数据

bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
// 输出内容
111111
222222
333333
444444

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐