一 ,安装 :

1 ,规划 :

node01node02node03
zkzkzk
kafkakafkakafka

2 ,安装包下载 :

要这个 : Scala 2.11 - kafka_2.11-0.11.0.0.tgz (asc, md5)

http://kafka.apache.org/downloads.html
在这里插入图片描述

3 ,得到 :

kafka_2.11-0.11.0.0.tgz

4 ,上传到 :

/export/softwares

5 ,解压 :

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /export/servers

6 ,创建日志文件夹 :logs

cd /export/servers/kafka_2.11-0.11.0.0
mkdir logs

7 ,修改配置文件

  1. cd /export/servers/kafka_2.11-0.11.0.0/config
  2. vim server.properties ( 4 处注意的地方 )
# broker 的全局唯一编号,不能重复( 服务器编号,从 0 开始 ) ( 手动指定 )
broker.id=0
# 删除 topic 功能使能 ( 允许删除数据 ) ( 手动指定 )
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径 ( 手动指定 )
log.dirs=/export/servers/kafka_2.11-0.11.0.0/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接 Zookeeper 集群地址 ( 手动指定 )
zookeeper.connect=node01:2181,node02:2181,node03:2181

8 ,配置环境变量 :

vi /etc/profile

# KAFKA_HOME
export KAFKA_HOME=/export/servers/kafka_2.11-0.11.0.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

9 ,node02 ,node03 安装 kafka :

  1. 分发安装文件 :
cd /export/servers
scp -r kafka_2.11-0.11.0.0/ root@node02:$PWD
scp -r kafka_2.11-0.11.0.0/ root@node03:$PWD
  1. 修改 brokerid :
    cd /export/servers/kafka_2.11-0.11.0.0/config
    vim server.properties
    分别为 1,2
  2. 配置环境变量 :
    vim /etc/profile
    source /etc/profile

9 ,启动,关闭 :

  1. 启动 zk :三台机器都启动
    1 ,启动:zkServer.sh start
    2 ,查看状态 :zkServer.sh status
  2. 启动 kafka :依次在 node01,node02,node03 上启动 kafka
    1 ,进入目录 :cd /export/servers/kafka_2.11-0.11.0.0
    2 ,启动 :bin/kafka-server-start.sh config/server.properties &
    3 ,关闭 :bin/kafka-server-stop.sh stop
  3. 注意 :
    看到 kafka 启动成功后,敲一下回车
    在这里插入图片描述

二 ,命令行操作 : topic

1 ,topic , partition , message 关系

  1. topic :是一个主题,存储一个类型的数据。
  2. partition :分区,一个主题可以有多个分区。
  3. message : 具体的消息,存储在具体的分区中。
  4. 分区 : 不能跨机器,每台机器可以有多个分区。
    在这里插入图片描述

2 ,查看所有 topic

bin/kafka-topics.sh --zookeeper node01:2181 --list

3 ,创建 topic :

  1. 说明 :
    1 ,名字 : first
    2 ,分区数 : 1
    3 ,副本数 : 3 个备份
  2. 脚本
bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first
  1. 成功的标志 :
    cd /export/servers/kafka_2.11-0.11.0.0/logs
    ls
    看到 0 号分区
    在这里插入图片描述

4 ,删除 topic :

bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic first

三 ,命令行操作 : 生产者

1 ,目的 :

  1. 用控制台给 kafka 发数据。
  2. 发送到哪个主题 : first

2 ,命令 :

  1. 命令
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
  1. 效果:控制台等待输入
  2. 我们输入
    hello
    world

3 ,数据存储到哪里去了 :

  1. 找到文件,看内容
    cd /export/servers/kafka_2.11-0.11.0.0/logs/first-0
    cat 00000000000000000000.log
  2. 看到
    hello ,到时还有一堆乱码
    在这里插入图片描述
  3. 说明 :
    kafka 有自己的序列化存储方式。

4 ,消费某个主题的数据 :理论

  1. 默认:从现在开始消费,看不到以往的数据。
  2. 也可以:从第一条数据开始消费

5 ,消费某个主题的数据 :消费到控制台

  1. 目的 : 从开始消费数据
  2. 代码 :
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first
  1. 效果 : 得到了所有数据
    在这里插入图片描述

6 ,查看某个 topic 的详情 :

bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic first
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐