2 ,Kafka 集群部署,启动,关闭,命令行操作
一 ,安装 :1 ,规划 :node01node02node03zkzkzkkafkakafkakafka2 ,安装包下载 :要这个 : Scala 2.11- kafka_2.11-0.11.0.0.tgz (asc, md5)http://kafka.apache.org/downloads.html3 ,得到 :kafka_2.11...
·
一 ,安装 :
1 ,规划 :
node01 | node02 | node03 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
2 ,安装包下载 :
要这个 : Scala 2.11 - kafka_2.11-0.11.0.0.tgz (asc, md5)
http://kafka.apache.org/downloads.html
3 ,得到 :
kafka_2.11-0.11.0.0.tgz
4 ,上传到 :
/export/softwares
5 ,解压 :
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /export/servers
6 ,创建日志文件夹 :logs
cd /export/servers/kafka_2.11-0.11.0.0
mkdir logs
7 ,修改配置文件
- cd /export/servers/kafka_2.11-0.11.0.0/config
- vim server.properties ( 4 处注意的地方 )
# broker 的全局唯一编号,不能重复( 服务器编号,从 0 开始 ) ( 手动指定 )
broker.id=0
# 删除 topic 功能使能 ( 允许删除数据 ) ( 手动指定 )
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径 ( 手动指定 )
log.dirs=/export/servers/kafka_2.11-0.11.0.0/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接 Zookeeper 集群地址 ( 手动指定 )
zookeeper.connect=node01:2181,node02:2181,node03:2181
8 ,配置环境变量 :
vi /etc/profile
# KAFKA_HOME
export KAFKA_HOME=/export/servers/kafka_2.11-0.11.0.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
9 ,node02 ,node03 安装 kafka :
- 分发安装文件 :
cd /export/servers
scp -r kafka_2.11-0.11.0.0/ root@node02:$PWD
scp -r kafka_2.11-0.11.0.0/ root@node03:$PWD
- 修改 brokerid :
cd /export/servers/kafka_2.11-0.11.0.0/config
vim server.properties
分别为 1,2 - 配置环境变量 :
vim /etc/profile
source /etc/profile
9 ,启动,关闭 :
- 启动 zk :三台机器都启动
1 ,启动:zkServer.sh start
2 ,查看状态 :zkServer.sh status - 启动 kafka :依次在 node01,node02,node03 上启动 kafka
1 ,进入目录 :cd /export/servers/kafka_2.11-0.11.0.0
2 ,启动 :bin/kafka-server-start.sh config/server.properties &
3 ,关闭 :bin/kafka-server-stop.sh stop - 注意 :
看到 kafka 启动成功后,敲一下回车
二 ,命令行操作 : topic
1 ,topic , partition , message 关系
- topic :是一个主题,存储一个类型的数据。
- partition :分区,一个主题可以有多个分区。
- message : 具体的消息,存储在具体的分区中。
- 分区 : 不能跨机器,每台机器可以有多个分区。
2 ,查看所有 topic
bin/kafka-topics.sh --zookeeper node01:2181 --list
3 ,创建 topic :
- 说明 :
1 ,名字 : first
2 ,分区数 : 1
3 ,副本数 : 3 个备份 - 脚本
bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first
- 成功的标志 :
cd /export/servers/kafka_2.11-0.11.0.0/logs
ls
看到 0 号分区
4 ,删除 topic :
bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic first
三 ,命令行操作 : 生产者
1 ,目的 :
- 用控制台给 kafka 发数据。
- 发送到哪个主题 : first
2 ,命令 :
- 命令
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
- 效果:控制台等待输入
- 我们输入
hello
world
3 ,数据存储到哪里去了 :
- 找到文件,看内容
cd /export/servers/kafka_2.11-0.11.0.0/logs/first-0
cat 00000000000000000000.log - 看到
hello ,到时还有一堆乱码
- 说明 :
kafka 有自己的序列化存储方式。
4 ,消费某个主题的数据 :理论
- 默认:从现在开始消费,看不到以往的数据。
- 也可以:从第一条数据开始消费
5 ,消费某个主题的数据 :消费到控制台
- 目的 : 从开始消费数据
- 代码 :
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first
- 效果 : 得到了所有数据
6 ,查看某个 topic 的详情 :
bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic first
更多推荐
已为社区贡献2条内容
所有评论(0)