kafka
一、安装kafka1.需要安装jdk2.安装kafka这里用的kafka是二进制安装包[root@oracle ~]# tar xf kafka_2.12-2.8.1.tgz-C /usr/local/[root@oracle ~]# cd /usr/local/[root@oracle local]# mv kafka_2.12-2.8.1/ kafka[root@oracle ~]# cd /
一、简介
1.消息队列解决的问题
针对于同步通信的方式来说,异步的方式可以让"上游系统" 快速成功(快速完成响应),极大提高了系统的吞吐量。
本质上是改变的软件之间的 通信方式 将 同步改变成了异步
2.消息队列的分类
2.1 有borker
(1)重topic
其中kafka、rocketMQ、ActiveMq都属于重topic
(2)轻topic
rabbitMQ
2.2 无borker
zeroMQ就是无borker的消息队列。在生产者和消费者之间没有使用borker,直接使用socket进行通信
3.kafka使用场景
3.1 日志收集
收集各种服务、系统日志,然后在由ELK或者hadoop、hbase等进行消费
3.2 消息系统
解耦生产者和消费者、缓存消息等
3.3 用户活动跟踪
记录web用户或者app用户各种活动,浏览网页、搜索、点击等活动
4.kafka的基本概念
4.1 消息(Message)
Kafka通过主题(Topic)来组织消息。消息可以是任何形式的数据,通常是键值对的形式。
4.2 主题(Topic)
主题是消息的逻辑分类,kafka根据topic对消息进行分类,发布到kakfa集群的每条消息都需要指定一个topic
4.3 生产者(Producer)
生产者负责将消息发布到Kafka的主题中。
4.4 消费者(Consumer)
消费者订阅一个或多个主题,并从中接收消息。
4.5 代理(Broker)
Kafka集群由多个代理组成,每个代理都是一个独立的Kafka服务器节点,负责存储和处理消息。
4.6 分区(Partition)
每个主题可以分为一个或多个分区,每个分区是有序的消息序列。分区使得Kafka可以水平扩展,并允许消息并行处理。
4.7 复制(Replication)
Kafka通过复制机制确保数据的持久性和容错性。每个分区可以配置多个副本,其中一个是领导者(Leader),其余是追随者(Follower)。领导者负责处理读写请求,而追随者则复制领导者的数据以提供容错性。
4.8 ZooKeeper
ZooKeeper是Kafka用于集群管理和协调的关键组件。它负责管理Kafka集群的状态、配置信息和领导者选举等任务。
二、安装前置工作
1.服务器列表
操作系统 | 服务角色 | IP地址 |
---|---|---|
centos7.7 | kafka01,zookeeper01 | 192.168.1.100 |
centos7.7 | kafka02,zookeeper02 | 192.168.1.101 |
centos7.7 | kafka03,zookeeper03 | 192.168.1.102 |
2.安装jdk1.8
三台都需要安装jdk1.8. jdk尽量不要安装openjdk
jdk-8u91-linux-x64.tar.gz
测试java命令是否能够使用
[root@node3 bin]# java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
3.下载kafka
wget https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz
三.安装zookeeper
kafka安装包是二进制的,其中包含了zookeeper组件。这里先配置zookeeper
[root@node-1 ~]# tar xf kafka_2.12-3.7.1.tgz
[root@node-1 ~]# mv kafka_2.12-3.7.1 /usr/local/kafka3.7
1.修改配置文件
三台都需改,这一步操作完全一致。配置文件内容也一致
[root@node-1 ~]# mkdir -p /data/zookeeper/{data,logs}
[root@node-1 ~]# cd /usr/local/kafka3.7/config/
[root@node-1 config]# cp zookeeper.properties zookeeper.properties_backup
[root@node-1 config]# vim zookeeper.properties
# 修改参数
dataDir=/data/zookeeper/data
# 新增参数
dataLogDir=/data/zookeeper/logs
initLimit=10
syncLimit=5
tickTime=2000
# 新增集群参数
server.0=192.168.1.100:12888:13888
server.1=192.168.1.101:12888:13888
server.2=192.168.1.102:12888:13888
解释:
server.A=B:C:D
1. A是一个数字,表示这个是第几号服务器。集群模式下需要在zookeeper.properties中dataDir指定的目录下创建一个文件myid,
这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zookeeper.properties里面的配置信息比较从而判断到底是哪个server。
2. B是这个服务器的地址。
3. C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
4. D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口
2.创建myid文件
1.100服务器
echo 0 > /data/zookeeper/data/myid
1.101服务器
echo 1 > /data/zookeeper/data/myid
1.102服务器
echo 2 > /data/zookeeper/data/myid
3.启动zookeeper集群
3.1 后台启动
三台服务器全部启动
[root@node-1 ~]# cd /usr/local/kafka3.7/
[root@node-1 kafka3.7]# ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
如果是用于测试可以在前台启动
./bin/zookeeper-server-start.sh config/zookeeper.properties
3.2 查看进程
[root@node-1 kafka3.7]# ps aux |grep zookeeper
3.2 分别查看端口
1.100服务器
[root@node-1 ~]# netstat -antp |grep -i listen |egrep "2181|12888|13888"
tcp6 0 0 192.168.1.100:13888 :::* LISTEN 7296/java
tcp6 0 0 :::2181 :::* LISTEN 7296/java
1.101服务器
[root@node-2 kafka3.7]# netstat -antp |grep -i listen |egrep "2181|12888|13888"
tcp6 0 0 192.168.1.101:13888 :::* LISTEN 8640/java
tcp6 0 0 :::2181 :::* LISTEN 8640/java
1.102服务器
[root@localhost kafka3.7]# netstat -antp |grep -i listen |egrep "2181|12888|13888"
tcp6 0 0 192.168.1.102:13888 :::* LISTEN 10571/java
tcp6 0 0 :::2181 :::* LISTEN 10571/java
tcp6 0 0 192.168.1.102:12888 :::* LISTEN 10571/java
# 这是发现另外两台已经连接过来,形成了zookeeper集群
[root@localhost kafka3.7]# netstat -antp |egrep "12888"
tcp6 0 0 192.168.1.102:12888 :::* LISTEN 10571/java
tcp6 0 0 192.168.1.102:12888 192.168.1.101:59732 ESTABLISHED 10571/java
tcp6 0 0 192.168.1.102:12888 192.168.1.100:51438 ESTABLISHED 10571/java
验证,可以关闭1.102的zookeeper.发现12888端口会在其他服务器上启动。
四、安装kafka
1.创建日志目录
[root@node-1 ~]# mkdir -p /data/kafka/logs
2.修改配置文件
[root@node-1 ~]# cd /usr/local/kafka3.7/config/
[root@node-1 config]# cp server.properties server.properties_backup
[root@node-1 config]# grep -v "#" server.properties_backup |grep -v "^$" > server.properties
# 注意修改。 101服务器的id=1 102服务器的id=2
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 修改对应目录
log.dirs=/data/kafka/logs/kafka-logs
# 修改对应地址。监听本机的IP地址
listeners=PLAINTEXT://192.168.1.100:9092
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#连接zaookeeper集群
zookeeper.connect=192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
将配置文件复制到另外两台,注意修改对应的内容
3.启动kakfka
启动三台kafka的命令一致
[root@node-1 ~]# cd /usr/local/kafka3.7/
[root@node-1 kafka3.7]# ./bin/kafka-server-start.sh -daemon ./config/server.properties
4.查看端口
netstat -antp |grep 9092
5.查看zookeeper的kafka情况
5.1 连接zookeeper
[root@node-1 ~]# cd /usr/local/kafka3.7/bin/
[root@node-1 bin]# ./zookeeper-shell.sh 192.168.1.102:2181
输入
ls /
返回结果
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
5.2 查看注册的brokers
这里的3个id就是kafka的borkers id。可以尝试关闭一个kafka节点。这里的缺少对应关闭的节点
ls /brokers/ids
[0, 1, 2]
五、基本操作
这里注意:
(1)创建topic的命令和以前的版本(2.x)不一样了。以前是指定–zookeeper。
(2)topic名称命名非常严谨。不能包含任何特殊字符和数字。即使能够创建成功,消费的时候也会有问题
(3)–bootstrap-server 指定的是kafka集群地址,也就是broker的地址
1.创建topic
./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --create --topic 自定义名称
2.删除topic
./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --delete --topic 已经存在的topic名称
3.显示topic列表
./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --list
4.启动生产者
这是向名称为test的topic中生产数据
./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test
5.启动消费者
5.1 匿名消费组
这样启动的消费者是一个kafka随机生成消费组的消费者。一个终端就是只有一个消费者的消费组
./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test
5.2 指定消费组名
启动了一个名称为testgroup的消费组
./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test --consumer-property group.id=testgroup
6.查看消费组
6.1 查看所有消费组
在两个终端上分别执行
./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic test
这是在两个终端上启动了两个消费组,每个消费组里只有一个消费者。
查看所有消费组命令如下:
[root@node3 bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --list
console-consumer-95829
console-consumer-26618
6.2 查看所有消费组详细信息
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --all-groups --describe
6.3 查看指定消费组信息
查看指定消费组的详细信息
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --group testgroup --describe
内容如下:
# 这里也提示了 该消费组中,没有任何的消费者
Consumer group 'testgroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 0 75 122 47 - - -
解释:
current-offset: 当前消费到第几条消息
log-end-offset: 消息最后的偏移量,也就是消息总量条数
lag: 还有多少条消息没有被消费
consumer-id,host,client-id 都为空说明没有任何消费者
有消费者的显示是这样的
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 0 122 122 0 console-consumer-763e791e-3207-4832-ac49-7dc752c7ce05 /192.168.1.19 console-consumer
6.4 查看指定组内的成员
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --group testgroup --members --describe
7.删除消费组
删除名为console-consumer-26618消费组.
删除时要注意:要停止消费者.消费组内必须要保证时空的
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --delete --group console-consumer-26618
六、分区
1.概念
一个topic可以分为多个分区。在kakfa集群中,在同一个topic中,多个分区会分布在不同的kafka节点上。
多个分区的主要作用是:
1.可以将消息分布式存储,解决单个消息文件过大的问题。
2.可以并行写
2.基本操作
2.1 创建分区
创建了一个名为message的topic,然后这个topic有3个分区
./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --create --topic message --partitions 3
查看tocpic分区的信息。这里可以看出其中PartitionCount 参数显示为3.说明分区内容为3个
[root@node-1 bin]# ./kafka-topics.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --describe --topic message
Topic: message TopicId: dwCMgzgmSMeNdcNeVn9efw PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: message Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: message Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: message Partition: 2 Leader: 2 Replicas: 2 Isr: 2
解释:
(1)leader: kafka读写操作,都发生在leader上,leader负责把数据同步给follower,当leader宕机,经过主从选举,从多个follower中选举产生一个新的leader.
这里的leader id 是指的kafka broker的 id
(2)follower:接受leader同步过来的数据
(3)lsr:可以同步和已同步的节点会被存放到lsr集合中
2.2 查看分区分布
1.100节点
[root@node-1 ~]# cd /data/kafka/logs/kafka-logs/
[root@node-1 kafka-logs]# ls |grep message
message-1
1.101节点
[root@node-2 ~]# cd /data/kafka/logs/kafka-logs/
[root@node-2 kafka-logs]# ls |grep message
message-0
1.102节点
[root@node-3 ~]# cd /data/kafka/logs/kafka-logs/
[root@node-3 kafka-logs]# ls |grep message
message-2
这里可以看出(1)message的topic被分为了3个分区。(2)3个分区分别在3个不同的节点上
3.不指定分区生产数据
3.1 启动一个生产者
启动的生产者没有指定分区
./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message
3.2 启动一个消费者
启动的消费者也没有指定分区
./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message
3.3 生产数据
[root@node2 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message
>1
>2
>3
>4
>5
>6
>7
>8
>9
>0
>hello
>world
>abc123
>hahaha
>zhangsan
3.4 查看消费数据情况
这里发现消费者正常消费完毕数据
[root@node3 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message
1
2
3
4
5
6
7
8
9
0
hello
world
abc123
hahaha
zhangsan
3.5 查看消费组详细信息
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --list
console-consumer-60694
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --describe --group console-consumer-60694
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-60694 message 0 - 0 - console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19 console-consumer
console-consumer-60694 message 1 - 0 - console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19 console-consumer
console-consumer-60694 message 2 - 15 - console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19 console-consumer
这里可以看出,所有的数据都生产到了分区2上。
3.6 启动第二个生产者
依然正常生产数据
[root@node2 bin]# ./kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message
>m1
>m2
>m3
>m4
>m5
>m6
>m7
>m8
查看消费者,也正常收到了了数据
3.7 查看消费组详细信息
[root@localhost bin]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --describe --group console-consumer-60694
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-60694 message 0 - 0 - console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19 console-consumer
console-consumer-60694 message 1 - 8 - console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19 console-consumer
console-consumer-60694 message 2 - 18 - console-consumer-c1eb5cfe-4c5c-4950-b477-1671d1b6ffaa /192.168.1.19 console-consumer
此时发现,第二个生产者的数据被生产到了分区1上.
从这里可以看出,如果不指定分区生产数据,kafka会自动均衡的分配在各个分区上。
3.8 注意
这里的current-offset和logend-offset没有值。这是因为我们使用的消费者是kafka console。自定义一个 具体消费组名的消费者就不会出现这种情况。
4.指定分区生产数据
4.1 生产者
kafka自带的终端工具脚本,无法向指定的分区生产数据。必须使用程序进行编写,才可以向指定的分区进行生产数据。
4.2 消费指定分区数据
注意,使用kafka自带的工具指定分区进行消费时,是不能指定自定义消费组的。两个参数冲突。
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092 --topic message --partition 1
4.3 使用python消费指定分区
安装
pip3 install kafka-python
查看版本
[root@node2 ~]# pip3 freeze |grep kafka
kafka-python==2.0.2
代码如下
import logging,datetime,time
from kafka.consumer import KafkaConsumer
topic = "xxxxxx"
bootstrap_server = ["xx.xx.xx.xx:9092","xx.xx.xx.xx:9092","xx.xx.xx.xx:9092"]
cmr = KafkaConsumer(topic,bootstrap_servers=bootstrap_server,group_id=None,auto_offset_reset='smallest')
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(message)s",
datefmt='%Y-%m-%d %H:%M:%S',
filename="kafka_topic_{}.log".format(topic),
filemode='a'
)
def log(msg):
logging.info(msg)
for msg in cmr:
Topic = msg.topic
Offset= msg.offset
Time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(msg.timestamp/1000))
Partition = msg.partition
Content = msg.value.decode('utf-8')
result = "time: {} topic: {} partirion:{} offset: {} content: {}".format(Time,Topic,Partition,Offset,Content)
log(result)
更多推荐
所有评论(0)