一、简介

1.消息队列解决的问题

针对于同步通信的方式来说,异步的方式可以让"上游系统" 快速成功(快速完成响应),极大提高了系统的吞吐量。
本质上是改变的软件之间的 通信方式 将 同步改变成了异步

2.消息队列的分类

2.1 有borker

(1)重topic

​ 其中kafka、rocketMQ、ActiveMq都属于重topic

(2)轻topic

​ rabbitMQ

2.2 无borker

zeroMQ就是无borker的消息队列。在生产者和消费者之间没有使用borker,直接使用socket进行通信

3.kafka使用场景

3.1 日志收集

收集各种服务、系统日志,然后在由ELK或者hadoop、hbase等进行消费

3.2 消息系统

解耦生产者和消费者、缓存消息等

3.3 用户活动跟踪

记录web用户或者app用户各种活动,浏览网页、搜索、点击等活动

4.kafka的基本概念

4.1 消息(Message)

​ Kafka通过主题(Topic)来组织消息。消息可以是任何形式的数据,通常是键值对的形式。

4.2 主题(Topic)

​ 主题是消息的逻辑分类,kafka根据topic对消息进行分类,发布到kakfa集群的每条消息都需要指定一个topic

4.3 生产者(Producer)

​ 生产者负责将消息发布到Kafka的主题中。

4.4 消费者(Consumer)

​ 消费者订阅一个或多个主题,并从中接收消息。

4.5 代理(Broker)

​ Kafka集群由多个代理组成,每个代理都是一个独立的Kafka服务器节点,负责存储和处理消息。

4.6 分区(Partition)

​ 每个主题可以分为一个或多个分区,每个分区是有序的消息序列。分区使得Kafka可以水平扩展,并允许消息并行处理。

4.7 复制(Replication)

​ Kafka通过复制机制确保数据的持久性和容错性。每个分区可以配置多个副本,其中一个是领导者(Leader),其余是追随者(Follower)。领导者负责处理读写请求,而追随者则复制领导者的数据以提供容错性。

4.8 ZooKeeper

ZooKeeper是Kafka用于集群管理和协调的关键组件。它负责管理Kafka集群的状态、配置信息和领导者选举等任务。

二、安装前置工作

1.服务器列表

操作系统服务角色IP地址
centos7.7kafka01,zookeeper01192.168.1.100
centos7.7kafka02,zookeeper02192.168.1.101
centos7.7kafka03,zookeeper03192.168.1.102

2.安装jdk1.8

三台都需要安装jdk1.8. jdk尽量不要安装openjdk

jdk-8u91-linux-x64.tar.gz

测试java命令是否能够使用

[root@node3 bin]# java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)

3.下载kafka

wget https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz

三.安装zookeeper

kafka安装包是二进制的,其中包含了zookeeper组件。这里先配置zookeeper

[root@node-1 ~]# tar xf kafka_2.12-3.7.1.tgz 
[root@node-1 ~]# mv kafka_2.12-3.7.1 /usr/local/kafka3.7

1.修改配置文件

三台都需改,这一步操作完全一致。配置文件内容也一致

[root@node-1 ~]# mkdir -p /data/zookeeper/{data,logs}
[root@node-1 ~]# cd /usr/local/kafka3.7/config/
[root@node-1 config]# cp zookeeper.properties zookeeper.properties_backup
[root@node-1 config]# vim zookeeper.properties
# 修改参数
dataDir=/data/zookeeper/data


# 新增参数
dataLogDir=/data/zookeeper/logs
initLimit=10
syncLimit=5
tickTime=2000

# 新增集群参数
server.0=192.168.1.100:12888:13888
server.1=192.168.1.101:12888:13888
server.2=192.168.1.102:12888:13888

解释:

server.A=B:C:D
1. A是一个数字,表示这个是第几号服务器。集群模式下需要在zookeeper.properties中dataDir指定的目录下创建一个文件myid,
	这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zookeeper.properties里面的配置信息比较从而判断到底是哪个server。
2. B是这个服务器的地址。
3. C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
4. D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口

2.创建myid文件

1.100服务器

echo 0 > /data/zookeeper/data/myid

1.101服务器

echo 1 > /data/zookeeper/data/myid

1.102服务器

echo 2 > /data/zookeeper/data/myid

3.启动zookeeper集群

3.1 后台启动

三台服务器全部启动

[root@node-1 ~]# cd /usr/local/kafka3.7/
[root@node-1 kafka3.7]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

如果是用于测试可以在前台启动

./bin/zookeeper-server-start.sh  config/zookeeper.properties 

3.2 查看进程

[root@node-1 kafka3.7]# ps aux |grep zookeeper 

3.2 分别查看端口

1.100服务器

[root@node-1 ~]# netstat -antp |grep -i listen |egrep "2181|12888|13888"
tcp6       0      0 192.168.1.100:13888     :::*                    LISTEN      7296/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      7296/java

1.101服务器

[root@node-2 kafka3.7]# netstat -antp |grep -i listen |egrep "2181|12888|13888"
tcp6       0      0 192.168.1.101:13888     :::*                    LISTEN      8640/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      8640/java 

1.102服务器

[root@localhost kafka3.7]# netstat -antp |grep -i listen |egrep "2181|12888|13888"
tcp6       0      0 192.168.1.102:13888     :::*                    LISTEN      10571/java          
tcp6       0      0 :::2181                 :::*                    LISTEN      10571/java          
tcp6       0      0 192.168.1.102:12888     :::*                    LISTEN      10571/java 

    
# 这是发现另外两台已经连接过来,形成了zookeeper集群
[root@localhost kafka3.7]# netstat -antp |egrep "12888"
tcp6       0      0 192.168.1.102:12888     :::*                    LISTEN      10571/java          
tcp6       0      0 192.168.1.102:12888     192.168.1.101:59732     ESTABLISHED 10571/java          
tcp6       0      0 192.168.1.102:12888     192.168.1.100:51438     ESTABLISHED 10571/java 

验证,可以关闭1.102的zookeeper.发现12888端口会在其他服务器上启动。

四、安装kafka

1.创建日志目录

[root@node-1 ~]# mkdir -p /data/kafka/logs

2.修改配置文件

[root@node-1 ~]# cd /usr/local/kafka3.7/config/
[root@node-1 config]# cp server.properties server.properties_backup
[root@node-1 config]# grep -v "#" server.properties_backup  |grep -v "^$" > server.properties
# 注意修改。 101服务器的id=1    102服务器的id=2
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 修改对应目录
log.dirs=/data/kafka/logs/kafka-logs
# 修改对应地址。监听本机的IP地址
listeners=PLAINTEXT://192.168.1.100:9092
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#连接zaookeeper集群
zookeeper.connect=192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

将配置文件复制到另外两台,注意修改对应的内容

3.启动kakfka

启动三台kafka的命令一致

[root@node-1 ~]# cd /usr/local/kafka3.7/
[root@node-1 kafka3.7]# ./bin/kafka-server-start.sh -daemon ./config/server.properties

4.查看端口

netstat -antp |grep 9092

5.查看zookeeper的kafka情况

5.1 连接zookeeper

[root@node-1 ~]# cd /usr/local/kafka3.7/bin/
[root@node-1 bin]# ./zookeeper-shell.sh 192.168.1.102:2181

输入

ls /

返回结果

[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

5.2 查看注册的brokers

这里的3个id就是kafka的borkers id。可以尝试关闭一个kafka节点。这里的缺少对应关闭的节点

ls /brokers/ids
[0, 1, 2]

五、基本操作

这里注意:
(1)创建topic的命令和以前的版本(2.x)不一样了。以前是指定–zookeeper。
(2)topic名称命名非常严谨。不能包含任何特殊字符和数字。即使能够创建成功,消费的时候也会有问题
(3)–bootstrap-server 指定的是kafka集群地址,也就是broker的地址

1.创建topic

./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --create --topic 自定义名称

2.删除topic

./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --delete --topic 已经存在的topic名称

3.显示topic列表

./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --list

4.启动生产者

这是向名称为test的topic中生产数据

./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic  test

5.启动消费者

5.1 匿名消费组

这样启动的消费者是一个kafka随机生成消费组的消费者。一个终端就是只有一个消费者的消费组

./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test

5.2 指定消费组名

启动了一个名称为testgroup的消费组

./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test --consumer-property group.id=testgroup

6.查看消费组

6.1 查看所有消费组

在两个终端上分别执行

./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test

这是在两个终端上启动了两个消费组,每个消费组里只有一个消费者。
查看所有消费组命令如下:

[root@node3 bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --list
console-consumer-95829
console-consumer-26618

6.2 查看所有消费组详细信息

./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --all-groups --describe

6.3 查看指定消费组信息

查看指定消费组的详细信息

./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --group testgroup --describe

内容如下:

# 这里也提示了 该消费组中,没有任何的消费者
Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            0          75              122             47              -               -               -

解释:
current-offset: 当前消费到第几条消息
log-end-offset: 消息最后的偏移量,也就是消息总量条数
lag: 还有多少条消息没有被消费
consumer-id,host,client-id 都为空说明没有任何消费者

有消费者的显示是这样的

GROUP      TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG     CONSUMER-ID                                           HOST            CLIENT-ID
testgroup  test      0          122             122             0    console-consumer-763e791e-3207-4832-ac49-7dc752c7ce05 /192.168.1.19   console-consumer

6.4 查看指定组内的成员

./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --group testgroup --members --describe

7.删除消费组

删除名为console-consumer-26618消费组.
删除时要注意:要停止消费者.消费组内必须要保证时空的

./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --delete --group console-consumer-26618

六、分区

1.概念

一个topic可以分为多个分区。在kakfa集群中,在同一个topic中,多个分区会分布在不同的kafka节点上。
多个分区的主要作用是:

1.可以将消息分布式存储,解决单个消息文件过大的问题。
2.可以并行写

2.基本操作

2.1 创建分区

创建了一个名为message的topic,然后这个topic有3个分区

./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --create --topic message --partitions 3

查看tocpic分区的信息。这里可以看出其中PartitionCount 参数显示为3.说明分区内容为3个

[root@node-1 bin]# ./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092  --describe --topic message
Topic: message	TopicId: dwCMgzgmSMeNdcNeVn9efw	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: message	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: message	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: message	Partition: 2	Leader: 2	Replicas: 2	Isr: 2

解释:

(1)leader: kafka读写操作,都发生在leader上,leader负责把数据同步给follower,当leader宕机,经过主从选举,从多个follower中选举产生一个新的leader.
这里的leader id 是指的kafka broker的 id
(2)follower:接受leader同步过来的数据
(3)lsr:可以同步和已同步的节点会被存放到lsr集合中

2.2 查看分区分布

1.100节点

[root@node-1 ~]# cd /data/kafka/logs/kafka-logs/
[root@node-1 kafka-logs]# ls |grep message
message-1

1.101节点

[root@node-2 ~]# cd /data/kafka/logs/kafka-logs/
[root@node-2 kafka-logs]# ls |grep message
message-0

1.102节点

[root@node-3 ~]# cd /data/kafka/logs/kafka-logs/
[root@node-3 kafka-logs]# ls |grep message
message-2

这里可以看出(1)message的topic被分为了3个分区。(2)3个分区分别在3个不同的节点上

3.不指定分区生产数据

3.1 启动一个生产者

启动的生产者没有指定分区

./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic  message

3.2 启动一个消费者

启动的消费者也没有指定分区

./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message

3.3 生产数据

[root@node2 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic  message
>1
>2
>3
>4
>5
>6
>7
>8
>9
>0
>hello
>world
>abc123
>hahaha
>zhangsan

3.4 查看消费数据情况

这里发现消费者正常消费完毕数据

[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message
1
2
3
4
5
6
7
8
9
0
hello
world
abc123
hahaha
zhangsan

3.5 查看消费组详细信息

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092  --list
console-consumer-60694
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --describe --group console-consumer-60694

GROUP                  TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                           HOST            CLIENT-ID
console-consumer-60694 message   0          -               0               -     console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19   console-consumer
console-consumer-60694 message   1          -               0               -     console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19   console-consumer
console-consumer-60694 message   2          -               15              -     console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19   console-consumer

这里可以看出,所有的数据都生产到了分区2上。

3.6 启动第二个生产者

依然正常生产数据

[root@node2 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic  message
>m1
>m2
>m3
>m4
>m5
>m6
>m7
>m8

查看消费者,也正常收到了了数据

3.7 查看消费组详细信息

[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --describe --group console-consumer-60694

GROUP                  TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                           HOST            CLIENT-ID
console-consumer-60694 message  0          -               0               -     console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19   console-consumer
console-consumer-60694 message  1          -               8               -     console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19   console-consumer
console-consumer-60694 message  2          -               18              -     console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19   console-consumer

此时发现,第二个生产者的数据被生产到了分区1上.
从这里可以看出,如果不指定分区生产数据,kafka会自动均衡的分配在各个分区上。

3.8 注意

这里的current-offset和logend-offset没有值。这是因为我们使用的消费者是kafka console。自定义一个 具体消费组名的消费者就不会出现这种情况。

4.指定分区生产数据

4.1 生产者

kafka自带的终端工具脚本,无法向指定的分区生产数据。必须使用程序进行编写,才可以向指定的分区进行生产数据。

4.2 消费指定分区数据

注意,使用kafka自带的工具指定分区进行消费时,是不能指定自定义消费组的。两个参数冲突。

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message --partition 1

4.3 使用python消费指定分区

安装

pip3 install kafka-python

查看版本

[root@node2 ~]# pip3 freeze |grep kafka
kafka-python==2.0.2

代码如下

import logging,datetime,time
from kafka.consumer import KafkaConsumer


topic = "xxxxxx"
bootstrap_server = ["xx.xx.xx.xx:9092","xx.xx.xx.xx:9092","xx.xx.xx.xx:9092"]
cmr = KafkaConsumer(topic,bootstrap_servers=bootstrap_server,group_id=None,auto_offset_reset='smallest')


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(message)s",
    datefmt='%Y-%m-%d  %H:%M:%S',
    filename="kafka_topic_{}.log".format(topic),
    filemode='a'
)



def log(msg):
    logging.info(msg)

for msg in cmr:
    Topic = msg.topic
    Offset= msg.offset
    Time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(msg.timestamp/1000))
    Partition = msg.partition
    Content = msg.value.decode('utf-8')
    result = "time: {} topic: {}  partirion:{} offset: {}  content: {}".format(Time,Topic,Partition,Offset,Content)
    log(result)
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐