kafka学习(二):单机环境搭建、基本使用
下载kafka很简单,可以使用源码的方式和安装包的方式安装。这里使用安装包的方式进行安装,只需要进行解压运行即可。源码下载地址:https://archive.apache.org/dist/kafka/2.5.0/kafka-2.5.0-src.tgz安装包下载地址:https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz。
1、单机环境搭建
1.1、下载kafka
下载kafka很简单,可以使用源码的方式和安装包的方式安装。这里使用安装包的方式进行安装,只需要进行解压运行即可。
源码下载地址:https://archive.apache.org/dist/kafka/2.5.0/kafka-2.5.0-src.tgz
安装包下载地址:https://archive.apache.org/dist/kafka/2.5.0/kafka_2.12-2.5.0.tgz
1.2、解压
解压 kafka_2.12-2.5.0.tgz,得到 "kafka_2.12-2.5.0 " 文件夹。
1.3、配置zookeeper.properties
kafka需要安装zookeeper使用,但kafka已集成zookeeper,在单机搭建时可直接使用。
使用需配置kafka_2.12-2.5.0/config 下的“zookeeper.properties”。
注: kafka必须配置zookeeper 否则无法启动,无论是单机还是集群。
配置"zookeeper.properties":
修改dataDir和clientPort。前者是快照存放地址(自己随意配置),后者是客户端连接zookeeper服务的端口。默认端口2181
1.4、kafka配置server.properties
配置kafka_2.12-2.5.0/config下的“server.properties”,修改log.dirs和zookeeper.connect。前者是日志存放文件夹,后者是zookeeper连接地址(端口和clientPort保持一致)。
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/root/kafka_2.12-2.5.0/data/kafka-logs
############################# Socket Server Settings #############################
# 公网地址配置
advertised.listeners=PLAINTEXT://43.142.243.124:9092
# 内网配置:
#listeners=PLAINTEXT://43.142.243.124:9092
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
注意:若需要外部访问,一定需要配置listeners , 默认为本机IP 、端口默认9092。
2、kafka的启动
2.1、开启kafka自带zookeeper
# 后台启动zookeeper
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > ./zookeeper-run.log 2>&1 &
2.2、开启kafka
# 启动kafka
nohup ./bin/kafka-server-start.sh ./config/server.properties > ./kafka-run.log 2>&1 &
使用jps命令查看是否正常了( jps是jdk提供的一个查看当前java进程的小工具) :
3、kafka的基本使用
3.1、创建自定义的topic
# 创建test 主题
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
其中zookeeper 可以使用hostname 也可以使用IP,若使用hostname,请注意hosts 即/etc/hosts 与hostname最好保持一致!
说明:
--create # 代表创建
--zookeeper 服务器地址:端口 # 连接的zookeeper地址及端口,如果是集群,可以只写一个,也可写多个。
--topic 名称 # 设置topic的名称
--partitions 数量 # 设置主题的分区数量
--replication-factor 副本数量 # 用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
即:总分区数= partitions * replication-factor
该值的 副本数已把主分区数包含在内。
例如:
副本数是1 ,意思是就一个分区,同时也是主分区。即:没有备份。
副本数是2,意思是有2个分区,1个是主分区,1个是从分区。即:一个备份。
注:
- 副本数量要小于等于分区数量。
- 主题的分区,本质上就是一个文件目录。
分区目录的命名规则:主题名-分区编号(分区编号从0开始)
例如:test-0
- kafka主题引入分区机制的作用:可以分布式的对一个主题的数据进行存储和管理。
主题的分区数量可以大于kafka的broker服务器数量。Kafka底层尽可能确保分区目录的负载均衡。例如:一个主题有10个分区,但只有3个broker服务器,则分区目录的数量:3-3-4。
【topic的命名规范】:
topic的命名同样不推荐(虽然可以这样做)使用双下划线“__”开头,因为以双下划线开头的topic一般看作是kafka的内部topic,比如__consumer_offsets和__transaction_state。
topic的名称必须由大小写字母、数字、“.”、“-”、“_”组成,不能为空、不能为“.”、不能为“…”,且长度不能超过249。
3.2、查看所有topic列表
# 查看所有topic主题
./bin/kafka-topics.sh --list --zookeeper localhost:2181
说明:
--list # 查看所有topic
--zookeeper 服务器地址:端口 # 连接的zookeeper地址及端口,如果是集群,可以只写一个,也可写多个。
示例:
3.3、查看某个主题的详细信息
# 查看主题的详细信息
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
3.4、启动producer测试线程
即:启动一个生产者线程。
#生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
3.5、启动consumer测试线程
即:启动一个消费者线程。
#消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
–from-beginning 是从头开始消费
可以通过producer、consumer模拟消息的发送、接收。
生产、消费示例:
4、kafka命令中的参数说明
参数 | 值类型 | 说明 | 有效值 |
--topic | string | 被消费的topic | |
--whitelist | string | 正则表达式,指定要包含以供使用的主题的白名单 | |
--partition | integer | 指定分区 | |
除非指定’–offset’,否则从分区结束(latest)开始消费 | |||
--offset | string | 执行消费的起始offset位置 | latest |
默认值:latest | earliest | ||
<offset> | |||
--consumer-property | string | 将用户定义的属性以key=value的形式传递给使用者 | |
--consumer.config | string | 消费者配置属性文件 | |
请注意,[consumer-property]优先于此配置 | |||
--formatter | string | 用于格式化kafka消息以供显示的类的名称 | kafka.tools.DefaultMessageFormatter |
默认值:kafka.tools.DefaultMessageFormatter | kafka.tools.LoggingMessageFormatter | ||
kafka.tools.NoOpMessageFormatter | |||
kafka.tools.ChecksumMessageFormatter | |||
--property | string | 初始化消息格式化程序的属性 | print.timestamp=true|false |
print.key=true|false | |||
print.value=true|false | |||
key.separator=<key.separator> | |||
line.separator=<line.separator> | |||
key.deserializer=<key.deserializer> | |||
value.deserializer=<value.deserializer> | |||
--from-beginning | 从存在的最早消息开始,而不是从最新消息开始 | ||
--max-messages | integer | 消费的最大数据量,若不指定,则持续消费下去 | |
--timeout-ms | integer | 在指定时间间隔内没有消息可用时退出 | |
--skip-message-on-error | 如果处理消息时出错,请跳过它而不是暂停 | ||
--bootstrap-server | string | 必需(除非使用旧版本的消费者),要连接的服务器 | |
--key-deserializer | string | ||
--value-deserializer | string | ||
--enable-systest-events | 除记录消费的消息外,还记录消费者的生命周期 | ||
(用于系统测试) | |||
--isolation-level | string | 设置为read_committed以过滤掉未提交的事务性消息 | |
设置为read_uncommitted以读取所有消息 | |||
默认值:read_uncommitted | |||
--group | string | 指定消费者所属组的ID | |
--blacklist | string | 要从消费中排除的主题黑名单 | |
--csv-reporter-enabled | 如果设置,将启用csv metrics报告器 | ||
--delete-consumer-offsets | 如果指定,则启动时删除zookeeper中的消费者信息 | ||
--metrics-dir | string | 输出csv度量值 | |
需与[csv-reporter-enable]配合使用 | |||
--zookeeper | string | 必需(仅当使用旧的使用者时)连接zookeeper的字符串。 | |
可以给出多个URL以允许故障转移 |
更多推荐
所有评论(0)