1、单机环境搭建

1.1、下载kafka

        下载kafka很简单,可以使用源码的方式和安装包的方式安装。这里使用安装包的方式进行安装,只需要进行解压运行即可。
源码下载地址:https://archive.apache.org/dist/kafka/2.5.0/kafka-2.5.0-src.tgz
安装包下载地址:https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz

1.2、解压

        解压 kafka_2.12-2.5.0.tgz,得到 "kafka_2.12-2.5.0 " 文件夹。

1.3、配置zookeeper.properties

        kafka需要安装zookeeper使用,但kafka已集成zookeeper,在单机搭建时可直接使用。

        使用需配置kafka_2.12-2.5.0/config 下的“zookeeper.properties”。

注:  kafka必须配置zookeeper 否则无法启动,无论是单机还是集群。

配置"zookeeper.properties":

         修改dataDir和clientPort。前者是快照存放地址(自己随意配置),后者是客户端连接zookeeper服务的端口。默认端口2181

 1.4、kafka配置server.properties

        配置kafka_2.12-2.5.0/config下的“server.properties”,修改log.dirs和zookeeper.connect。前者是日志存放文件夹,后者是zookeeper连接地址(端口和clientPort保持一致)。

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/root/kafka_2.12-2.5.0/data/kafka-logs


############################# Socket Server Settings #############################
# 公网地址配置
advertised.listeners=PLAINTEXT://43.142.243.124:9092

# 内网配置:
#listeners=PLAINTEXT://43.142.243.124:9092


############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

注意:若需要外部访问,一定需要配置listeners , 默认为本机IP 、端口默认9092。

2、kafka的启动

2.1、开启kafka自带zookeeper

# 后台启动zookeeper
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > ./zookeeper-run.log 2>&1 &

2.2、开启kafka

# 启动kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties > ./kafka-run.log 2>&1 &

使用jps命令查看是否正常了( jps是jdk提供的一个查看当前java进程的小工具) :

3、kafka的基本使用

3.1、创建自定义的topic

# 创建test 主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

        其中zookeeper 可以使用hostname 也可以使用IP,若使用hostname,请注意hosts 即/etc/hosts 与hostname最好保持一致!

说明:

--create        # 代表创建

--zookeeper 服务器地址:端口  # 连接的zookeeper地址及端口,如果是集群,可以只写一个,也可写多个。

--topic 名称                  # 设置topic的名称

--partitions 数量          # 设置主题的分区数量

--replication-factor 副本数量         # 用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。

即:总分区数= partitions * replication-factor

该值的 副本数已把主分区数包含在内

例如:

        副本数是1 ,意思是就一个分区,同时也是主分区。即:没有备份

        副本数是2,意思是有2个分区,1个是主分区,1个是从分区。即:一个备份。

注:

  1. 副本数量要小于等于分区数量。
  2. 主题的分区,本质上就是一个文件目录

分区目录的命名规则主题名-分区编号(分区编号从0开始)  

例如:test-0

  1. kafka主题引入分区机制的作用:可以分布式的对一个主题的数据进行存储和管理

主题的分区数量可以大于kafka的broker服务器数量。Kafka底层尽可能确保分区目录的负载均衡。例如:一个主题有10个分区,但只有3个broker服务器,则分区目录的数量:3-3-4。

【topic的命名规范】

        topic的命名同样不推荐(虽然可以这样做)使用双下划线“__”开头,因为以双下划线开头的topic一般看作是kafka的内部topic,比如__consumer_offsets和__transaction_state。

        topic的名称必须由大小写字母、数字、“.”、“-”、“_”组成,不能为空、不能为“.”、不能为“…”,且长度不能超过249。

3.2、查看所有topic列表

# 查看所有topic主题
./bin/kafka-topics.sh --list --zookeeper localhost:2181

说明:

--list        # 查看所有topic

--zookeeper 服务器地址:端口          # 连接的zookeeper地址及端口,如果是集群,可以只写一个,也可写多个。

示例:

 

3.3、查看某个主题的详细信息

# 查看主题的详细信息
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test

3.4、启动producer测试线程

即:启动一个生产者线程。

#生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

3.5、启动consumer测试线程

即:启动一个消费者线程。

#消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

from-beginning 是从头开始消费

可以通过producer、consumer模拟消息的发送、接收。

生产、消费示例:

4、kafka命令中的参数说明

参数值类型说明有效值
--topicstring被消费的topic
--whiteliststring正则表达式,指定要包含以供使用的主题的白名单
--partitioninteger指定分区
除非指定’–offset’,否则从分区结束(latest)开始消费
--offsetstring执行消费的起始offset位置latest
默认值:latestearliest
<offset>
--consumer-propertystring将用户定义的属性以key=value的形式传递给使用者
--consumer.configstring消费者配置属性文件
请注意,[consumer-property]优先于此配置
--formatterstring用于格式化kafka消息以供显示的类的名称kafka.tools.DefaultMessageFormatter
默认值:kafka.tools.DefaultMessageFormatterkafka.tools.LoggingMessageFormatter
kafka.tools.NoOpMessageFormatter
kafka.tools.ChecksumMessageFormatter
--propertystring初始化消息格式化程序的属性print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.deserializer>
--from-beginning从存在的最早消息开始,而不是从最新消息开始
--max-messagesinteger消费的最大数据量,若不指定,则持续消费下去
--timeout-msinteger在指定时间间隔内没有消息可用时退出
--skip-message-on-error如果处理消息时出错,请跳过它而不是暂停
--bootstrap-serverstring必需(除非使用旧版本的消费者),要连接的服务器
--key-deserializerstring
--value-deserializerstring
--enable-systest-events除记录消费的消息外,还记录消费者的生命周期
(用于系统测试)
--isolation-levelstring设置为read_committed以过滤掉未提交的事务性消息
设置为read_uncommitted以读取所有消息
默认值:read_uncommitted
--groupstring指定消费者所属组的ID
--blackliststring要从消费中排除的主题黑名单
--csv-reporter-enabled如果设置,将启用csv metrics报告器
--delete-consumer-offsets如果指定,则启动时删除zookeeper中的消费者信息
--metrics-dirstring输出csv度量值
需与[csv-reporter-enable]配合使用
--zookeeperstring必需(仅当使用旧的使用者时)连接zookeeper的字符串。
可以给出多个URL以允许故障转移
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐