MQ

传统架构的问题

Web1.0时代的网站,经过Nginx反向代理给Tomcat服务器,后端直接与MySQL交互,如果读写的请求很多,MySQL无法支撑很大的并发量,会导致MySQL宕机。

由于大多数情况写入只进行一次(注册等操作),大部分时候是读,就可以使用Redis作为读取时的缓存(将部分常用数据预加载到内存),MySQL只进行少量写的操作,从而实现读写分离,且占流量大头的读被Redis分担,解决了高并发读取的问题。

但是并发写入的数据也很多时,使用Redis将所有数据都缓存在内存原理上非常正确,实际上成本过高,这就需要低成本的消息队列来解决。消息队列例如Kafka,使用的内存+硬盘(类似HBASE的思想),解决高并发写入的问题时,性价比高。

MQ简介

消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

简言之:消息队列MQ用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存,是中间件。

既然是基于队列实现数据缓存,当然也就符合队列的特点:FIFO(先进先出)。

需要实现实时、高性能、高吞吐、高可靠的消息传递架构中都可以考虑使用MQ。

MQ是通信方式,MQTT是物联网通讯协议(TCP是通讯层协议。MQTT是基于客户端-服务器的消息发布/订阅传输协议,也就是改良TCP的应用层通讯协议),并不是一回事。。。但是有一些相似性,使用MQTT接收下位机传感器的数据并传送给Kafka,再最终保存到HDFS是正常操作。

MQ的优缺点

优点

实现了架构解耦。例如:A→B如果要改为A→C,需要停机改代码重新编译部署,耦合度高。如果有MQ,就是A→MQ,B和C都从MQ中拉取数据即可,显然耦合度降低。

保证了最终一致性。虽然并不能保证数据实时传输,但是可以确保最终数据都被接收到。

实现了异步传输,提高了传输性能。如果A发送用时10s,B、C接收都用1000s,合计2010s。如果使用MQ,B和C可以并行接收,只需要1010s。

缺点

由于增加了MQ,架构变复杂,运维难度提高。MQ作为信息流的唯一通道,必须保证MQ是可靠的,如果MQ故障,整个系统都瘫痪。

数据保证更加复杂,必须保证生产安全和消费安全。在传输过程中需要保证数据安全(不丢包、不重复)。没有MQ时只需要保证A→B和A→C分别安全即可(TCP底层就会校验是否有丢包);有了MQ后,就得保证A、B、C与MQ的连接都是安全的。

出现故障是必然的,不出现故障是偶然的。。。采用分布式系统实现高可用可以确保MQ的可靠性。最常用的Kafka自己有机制确保不丢包、不重复。

同步与异步

提交请求→后台处理→返回处理结果,类似这种提交完毕稍加等待可以直接看到最终结果的立即一致性的就是同步处理。例如:存钱、转账,不会看到数字渐变的过程。。。常用的TCP通信协议,每次都3次握手,传输完毕后4次挥手,传输时要确保成功一次才会继续下一次。安全性好但是性能差。这就是典型的同步传输。

提交请求→放入MQ(返回临时结果)→后台继续处理,类似这种会看到过程中临时结果的情况就是异步处理。用户暂时不需要关心最终结果,只要保证最终结果正确,实现最终一致性即可。常见的UDP通信协议,不管有木有收到都直接发送下一条,速度很快,但是容易丢包。这就是典型的异步传输。上位机当然不可能像下位机那样达到上百Hz的刷新率(Winform容易卡死),UI刷新20帧就够了,死循环一般用单独的进程,也是异步。

P2P模式

负责往消息队列中写数据的就是生产者,负责缓存传递数据的就是MQ,负责从MQ读取数据的就是消费者。这种模式下,生产者往MQ写数据,消费者从MQ读取数据。读取成功,也就是消费成功后,会返回一个确认信号,MQ会把消费成功的数据删除。这样就导致数据只能被一个消费者使用,无法实现数据的共享。

订阅发布模式

生产者、消费者、MQ都与P2P一致。多了主题(Topic),用于划分存储不同业务的数据。

生产者依旧是往MQ生产数据,将数据写入到对应的主题中。消费者可以订阅主题,如果主题中出现了新的数据,(可以是多个)消费者就可以立即消费(生产者与主题是一对一的关系,主题与消费者是一对多的关系)。

一个消费者可以订阅多个主题,一个主题可以被多个消费者订阅,消费成功以后,不会立即主动删除数据。例如:订阅公众号。。。MQTT当然也是订阅发布的模式。。。

Kafka概述

简介

Kafka官网这样介绍:
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

翻译过来:
Apache Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序(分布式的基于订阅发布模式高吞吐高性能实时消息队列系统)。是用Scala开发的。异常简练:

val inputData = sc.textFile("HDFS文件地址"val wcResult = inputData
		.flatMap(line => line.split("\\s+"))
		.map(word => (word,1))
		.reduceByKey((tmp,item) => tmp+item)

val wcResult = inputData
		.flatMap(_.split("\\s+"))
		.map((_,1))
		.reduceByKey(_+_)

wcResult.saveAsText(“HDFS路径”)

比起MapReduce的自带Word Count,简练太多了。。。

Kafka主要用作分布式消息队列(分布式存储)。作为实时消息队列存储是常使用的功能。

当然Scala开发的程序都适合做运算。Kafka的KafkaStream有分布式流式计算的功能,但是有Spakr Streaming和Flink这些更强的工具,基本用不到Scala的这个功能。

特点

Zookeeper:公平节点、数据量小、实时、不用做数据存储,协调服务组件。
HDFS:离线、文件系统、永久性存储、大数据量。
Redis:实时、NoSQL数据库、大数据量临时存储、小数据量永久性存储(分布式内存)。
Hbase:实时、NoSQL数据库、大数据量永久性存储(分布式堆内存+分布式硬盘HDFS)。
Kafka:实时、消息队列、大数据量的临时性存储(分布式内存Page Cache+自己管理的分布式硬盘)。

Kafka是一个高吞吐、高性能、高灵活性、安全性的分布式基于订阅发布模式的消息队列系统:

Kafka的高性能(也就是实时读写数据)是通过分布式内存的Page Cache实现的。∵Scala兼容Java,用堆内存时如果Scala进程卡死,就会被GC回收堆内存,∴不用堆内存。PageCache是操作系统的页缓存,Kafka进程卡死后重启(不是机器重启),数据依然在内存中(类似C++的内存泄露,但不是占用的堆内存)。

Kafka的高并发是通过分布式并行读写实现的。

Kafka的高吞吐是通过分布式硬盘存储实现的,没有使用HDFS,而是自己管理硬盘,进行顺序读写。

Kafka的高可靠是通过分布式主从架构实现的。

Kafka的高安全性通过将内存数据同步到硬盘、硬盘中备份副本(∵单机存储超过1份,宕机时全部失效,故副本总数不超过机器个数)实现。

名称解释

Broker

Kafka是分布式集群,这多台机器的每个节点就是一个Broker。

Producer

生产者,负责将数据写入Kafka,一般生产数据都是使用数据采集工具,例如SqoopFlume

Consumer

消费者,负责从Kafka消费数据。

Consumer Group

Kafka中必须以消费者组的形式从Kafka中消费数据。每个消费者都属于某个消费者组,多个消费者就可以并行消费数据。∴一个消费者组中的多个消费者消费的数据是不同的,所有消费者的数据都是自己所属的消费者组数据的子集,+起来才是完整数据。

Topic

数据主题(功能上有点像元数据),用于对数据进行分类,区分不同数据。类似MySQL的,但是Kafka是分布式存储,更类似HBASE分布式表Region。一个Topic可以划分为多个分区Partition,每个分区存储在不同的Kafka节点。

Partition

数据分区,用于实现Topic的分布式存储。一个Topic可以划分为多个分区,类似HBASE表可以划分为多个Region。每个分区存储在不同的Kafka节点Broker上。

分区副本

Kafka选用了副本机制来保证数据的安全性。上文也提过每个Kafka分区都可以有不超过节点个数的副本数。

Kafka将分区的多个副本划分为Leader(对外提供读写)和Follower(与Leader同步数据)。如果Leader宕机,Kafka会模拟Zookeeper的公平节点架构,借助Zookeeper重新选举出新的Leader,重新对外提供读写。

Segment

对每个分区的数据进行了更细的划分,先写入的数据会先生成一对Segment文件,存储到一定条件以后,后面数据写入另外一对Segment文件,每个文件就叫Segment文件对(类似KV键值对,成对存在)。

每个Segment是一对(也就是2个)文件:
.index后缀的文件存储文件的索引;.log后缀的文件真正存储数据的日志。

这种设计是为了加快数据检索的效率,将数据按照规则写入不同文件,以后可以根据规则快速的定位数据所在的文件。Segment的名字是这个Segment记录的offset的最小值(类似链表/迭代器的指针,指向下一个元素)。

Offset

Kafka中所有数据的读取都是按照Offset来读取数据,Offset就是每条数据的偏移量(写入分区的顺序)。MQ是FIFO的,先写入的Offset小(第一条=0),后写入的Offset大。

基于offset来指定数据的顺序,消费时就可以按照offset顺序来读取(消费者消费Topic分区中的数据按照offset进行顺序消费)。

生产者往Kafka中写入数据,写入某个分区

每个分区单独管理一套Offset,Offset从0开始对每条数据进行编号。

Kafka写入数据也是按照KV来写入数据:

offset Key	Value

对比

对比项\工具HDFSHBASERedisKafka
第一层(逻辑划分)目录(文件夹)NameSpaceDB0Topic
第二层(逻辑划分)文件TableXX
存储分区BlockRegion分片Partition
分区规则每128M范围槽位自己指定
分区安全副本WAL+HDFS副本副本副本
存储单元X列族XSegment
存储位置硬盘内存memstore+硬盘StoreFile内存内存 + .log + .index
架构NameNode + DataNodeHmaster + HregionServerReids节点Kafka Contorler + Kafka Broker
HA实现ZookeeperKafka Contorler + Kafka:Broker自行配置Kafka Contorler + Kafka:Broker

Kafka集群架构

Zookeeper:辅助选举、元数据存储。
Kafka:分布式主从架构,实现消息队列的构建。

Kafka是分布式主从架构:
主节点Kafka Controller是特殊的Broker(从Broker中选举出来的将才。。。还要负责普通Broker的工作),比普通的Broker多了管理节点的功能(管理从节点的Topic、分区、副本)。启动集群/Controller宕机,都会由Zookeeper辅助选举出Controller。

从节点Kafka Broker对外提供读写请求,还会监听Controller(Controller故障会重新选举)。

部署

官网下载地址

kafka_2.12-2.4.1显然有2个版本号。。。2.12是Scala版本号,2.4.1是Kafka版本号。

第一台机安装

上传安装包至node1:

cd /export/software/
rz

解压并创建日志目录并查看:

tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
cd /export/server/kafka_2.12-2.4.1/
mkdir logs

切换目录:

[root@node1 kafka_2.12-2.4.1]# cd config/
[root@node1 config]# ll -ah
总用量 76K
drwxr-xr-x 2 root root 4.0K 33 2020 .
drwxr-xr-x 7 root root  101 530 18:03 ..
-rw-r--r-- 1 root root  906 33 2020 connect-console-sink.properties
-rw-r--r-- 1 root root  909 33 2020 connect-console-source.properties
-rw-r--r-- 1 root root 5.2K 33 2020 connect-distributed.properties
-rw-r--r-- 1 root root  883 33 2020 connect-file-sink.properties
-rw-r--r-- 1 root root  881 33 2020 connect-file-source.properties
-rw-r--r-- 1 root root 2.2K 33 2020 connect-log4j.properties
-rw-r--r-- 1 root root 1.6K 33 2020 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2.3K 33 2020 connect-standalone.properties
-rw-r--r-- 1 root root 1.2K 33 2020 consumer.properties
-rw-r--r-- 1 root root 4.6K 33 2020 log4j.properties
-rw-r--r-- 1 root root 1.9K 33 2020 producer.properties
-rw-r--r-- 1 root root 6.7K 33 2020 server.properties
-rw-r--r-- 1 root root 1.1K 33 2020 tools-log4j.properties
-rw-r--r-- 1 root root 1.2K 33 2020 trogdor.conf
-rw-r--r-- 1 root root 1.2K 33 2020 zookeeper.properties

consumer.properties就是消费者配置文件,producer.properties就是生产者配置文件,server.properties就是Kafka服务配置文件。先修改server.properties

vim server.properties

第21行的唯一id是int类型,用于唯一标识服务端id,node1不用修改。

第60行需要修改日志目录:

log.dirs=/export/server/kafka_2.12-2.4.1/logs

第123行需要修改Zookeeper地址:

zookeeper.connect=node1:2181,node2:2181,node3:2181

末尾添加2个配置,允许删除topic,并且指定当前KafkaServer的主机名:

delete.topic.enable=true
host.name=node1

记得保存。

分发及修改其余节点配置

node1执行:

cd /export/server/
scp -r kafka_2.12-2.4.1 node2:$PWD
scp -r kafka_2.12-2.4.1 node3:$PWD

PWD是大写!!!

node2与node3中分别修改:

vim /export/server/kafka_2.12-2.4.1/config/server.properties

21行的broker.id=1、broker.id=2,末尾的host.name=node2、host.name=node3并保存。。。

添加环境变量:

vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:$KAFKA_HOME/bin

保存后刷新:

source /etc/profile

为了后续方便使用,可以3台都配置。。。只配置一台也可以。

启动与关闭

由于Kafka依赖Zookeeper,故先启动HDFS、YARN、Zookeeper再启动Kafk。。。

由于已经配置了环境变量,可以直接启动(但是需要指定配置文件路径):

kafka-server-start.sh /export/server/kafka_2.12-2.4.1/config/server.properties

没有配置环境变量则需要使用绝对路径:

/export/server/zookeeper-3.4.6/bin/start-zk-all.sh /export/server/kafka_2.12-2.4.1/config/server.properties

启动后是前台运行的:

[2021-05-30 21:49:38,306] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

如果需要后台运行需要输出重定向(不要换行,这是一条命令):

kafka-server-start.sh config/server.properties >>/dev/null 2>&1 & >>/dev/null 2>&1 &

关闭:

kafka-server-stop.sh 

当然ctrl+c也行。。。

封装脚本

这么麻烦肯定是不行的。。。还得一台一台启动。。。

启动脚本

vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1

for number in {1..3}
do
        host=node${number}
        echo ${host}
        /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
        echo "${host} started"
done

保存后添加可执行权限:

chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh

关闭脚本

vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1

for number in {1..3}
do
  host=node${number}
  echo ${host}
  /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
  echo "${host} stoped"
done

保存后添加可执行权限:

chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh

封装完毕后可以使用start-kafka.sh启动:

[root@node1 ~]# start-kafka.sh 
node1
node1 started
node2
node2 started
node3
node3 started

使用stop-kafka.sh关闭:

[root@node1 ~]# stop-kafka.sh 
node1
node1 stoped
node2
node2 stoped
node3
node3 stoped

Kafka压力测试

当然要先启动Kafka才能进行。。。启动已有脚本了。。。

Kafka自带了压力测试脚本,正式使用前需要测试,防止超过Kafka的峰值导致集群宕机。由于Kafka需要写入和读取到硬盘,故瓶颈在硬盘的读写速度。笔者试试读写400M/S的SSD能支撑多大的并发。

创建Topic

cd /export/server/kafka_2.12-2.4.1/
bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

生产测试

cd /export/server/kafka_2.12-2.4.1/bin/

[root@node1 bin]# kafka-producer-perf-test.sh --topic cdbigdata --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
[2021-05-30 22:03:53,227] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {cdbigdata=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
42177 records sent, 8433.7 records/sec (8.04 MB/sec), 2473.3 ms avg latency, 3454.0 ms max latency.
97920 records sent, 19580.1 records/sec (18.67 MB/sec), 1763.3 ms avg latency, 2353.0 ms max latency.
119984 records sent, 23996.8 records/sec (22.89 MB/sec), 1431.4 ms avg latency, 1929.0 ms max latency.
162592 records sent, 32518.4 records/sec (31.01 MB/sec), 1001.4 ms avg latency, 1202.0 ms max latency.
146032 records sent, 29206.4 records/sec (27.85 MB/sec), 1098.3 ms avg latency, 1327.0 ms max latency.
141072 records sent, 27984.9 records/sec (26.69 MB/sec), 1170.9 ms avg latency, 1314.0 ms max latency.
158768 records sent, 31753.6 records/sec (30.28 MB/sec), 1076.7 ms avg latency, 1253.0 ms max latency.
132528 records sent, 26505.6 records/sec (25.28 MB/sec), 1157.2 ms avg latency, 1470.0 ms max latency.
125072 records sent, 25014.4 records/sec (23.86 MB/sec), 1353.0 ms avg latency, 1574.0 ms max latency.
152160 records sent, 30425.9 records/sec (29.02 MB/sec), 1086.2 ms avg latency, 1191.0 ms max latency.
154320 records sent, 30864.0 records/sec (29.43 MB/sec), 1069.4 ms avg latency, 1174.0 ms max latency.
156672 records sent, 31334.4 records/sec (29.88 MB/sec), 1034.6 ms avg latency, 1208.0 ms max latency.
170416 records sent, 34076.4 records/sec (32.50 MB/sec), 971.6 ms avg latency, 1094.0 ms max latency.
162224 records sent, 32444.8 records/sec (30.94 MB/sec), 1017.6 ms avg latency, 1146.0 ms max latency.
174320 records sent, 34864.0 records/sec (33.25 MB/sec), 955.7 ms avg latency, 1135.0 ms max latency.
167056 records sent, 33411.2 records/sec (31.86 MB/sec), 967.5 ms avg latency, 1211.0 ms max latency.
171808 records sent, 34354.7 records/sec (32.76 MB/sec), 957.6 ms avg latency, 1051.0 ms max latency.
217968 records sent, 43593.6 records/sec (41.57 MB/sec), 751.9 ms avg latency, 950.0 ms max latency.
219248 records sent, 43849.6 records/sec (41.82 MB/sec), 744.9 ms avg latency, 1009.0 ms max latency.
297568 records sent, 59513.6 records/sec (56.76 MB/sec), 569.1 ms avg latency, 891.0 ms max latency.
285152 records sent, 57030.4 records/sec (54.39 MB/sec), 571.0 ms avg latency, 688.0 ms max latency.
290480 records sent, 58096.0 records/sec (55.40 MB/sec), 565.1 ms avg latency, 686.0 ms max latency.
332736 records sent, 66547.2 records/sec (63.46 MB/sec), 498.8 ms avg latency, 733.0 ms max latency.
340080 records sent, 68016.0 records/sec (64.87 MB/sec), 481.8 ms avg latency, 609.0 ms max latency.
335792 records sent, 67131.5 records/sec (64.02 MB/sec), 484.6 ms avg latency, 528.0 ms max latency.
5000000 records sent, 38812.643607 records/sec (37.01 MB/sec), 837.03 ms avg latency, 3454.00 ms max latency, 835 ms 50th, 1408 ms 95th, 1953 ms 99th, 3332 ms 99.9th.

500w条数据,每条1K,虚拟机集群效果差强人意。。。

消费测试

需要Topic先有数据,否则会报错:

WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the --timeout option to increase the timeout.
2021-05-30 22:13:31:244, 2021-05-30 22:13:41:356, 0.0000, 0.0000, 0, 0.0000, 1622384012202, -1622384002090, -0.0000, -0.0000

使用之前的那个Topic显然很合适:

[root@node1 bin]# kafka-consumer-perf-test.sh --topic cdbigdata --broker-list node1:9092,node2:9092,node3:9092  --fetch-size 1048576 --messages 5000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-05-30 22:24:39:208, 2021-05-30 22:25:10:412, 4768.3716, 152.8128, 5000000, 160235.8672, 1622384679574, -1622384648370, -0.0000, -0.0031

500w条,读取速度为150M/S,用时160s,差不多也是3w条/s。。。

Topic管理

由于配置了环境变量,直接:

kafka-topics.sh 

就会显示各种帮助。。。

创建

kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092

其中:

--create:创建
--topic :指定操作的Topic的名称
--partitions:指定分区个数,默认为1
--replication-factor:副本因子,默认为1
--bootstrap-server:指定Kafka服务端地址

列举

kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092

可以看到:

[root@node1 bin]# kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092
__consumer_offsets
bigdata
bigdata01
cdbigdata

查看

拿之前创建的Topic开刀:

kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node1:9092,node2:9092,node3:9092

运行后:

[root@node1 bin]# kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node1:9092,node2:9092,node3:9092  
Topic: bigdata01        PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: bigdata01        Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: bigdata01        Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: bigdata01        Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0

其中:

Partition:分区编号
Replicas:分区副本所在的Kafka Broker ID,每个分区的副本有两种角色(leader副本,follower副本)
Leader:leader 副本所在的Kafka节点
Isr:In-Sync-Replicas:正在同步的副本,可用的副本(用于leader故障时,选举新的leader)

删除

kafka-topics.sh --delete --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092

生产者消费者测试

构建生产者、消费者对象后,就可以指定Topic和Kafka集群地址,在命令行中生产者发送消息,消费者就可以接收到消息:
node1中:

kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092

node1再新建个会话:

kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning

生产者发送:

[root@node1 bin]# kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
>hehe1
>haha
>oo
>

消费者接收:

[root@node1 ~]# kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning   hehe1
haha
oo

其中:

--from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

使用该参数可以使用之前的生产者数据。。。但是数据顺序不对(从第一个、第二个、第三个分区读取数据来消费。。。分区内数据是有序的)。。。

ctrl+c强制结束消费者的进程后,再次启动消费者,内容与之前一致。

如果没有这个参数,就是直接从最新的偏移量读取,会丢失之前的数据。可以取消这个参数试试:

[root@node1 bin]# kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
>hehe1
>haha
>oo
>ss
>
[root@node1 ~]# kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  
ss

Java API

生产者API

在这里插入图片描述
Diamond types are not supported at language level '5'这种问题很常见。。。
在这里插入图片描述
art+enter选设置leve to 7 或者:
在这里插入图片描述
左上角File→Project Structure→Modules→Language level选8号即可。

package com.aa;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaClientProducerTest {
    public static void main(String[] args) {
        //构建配置管理对象的实例
        Properties properties = new Properties();
        //指定Kafka集群的地址
        properties.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");

        /*
        acks的参数表示生产者生产数据到Kafka的方式
        0:生产者将数据提交给Kafka,不管Kafka有没有写入成功,都直接发送下一条。
            快、数据丢失的概率比较高
        1:生产者将数据提交给Kafka,Kafka将数据写入对应分区的leader分区后,返回一个ack给生产者,
            生产者发送下一条。有一定的数据丢失风险,如果follower没来得及跟leader同步,leader故障,
            数据会丢失。
            如果生产者长时间没有收到ack,就认为数据写入失败,重试写入数据,直到收到Kafka的ack,
            再发送下一条。如果ack丢失,就会导致数据重复的问题。
        all或-1:生产者将数据提交给Kafka,Kafka将数据写入对应分区的leader分区后,等待所有Follower同步成功,
        返回一个ack给生产者,生产者发送下一条。安全但是慢。
         */
        properties.put("acks","all");
        //指定KV序列化类型
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //构建生产者的实例对象,用于生产数据到Kafka,需要加载生产者配置
        Producer<String, String> producer = new KafkaProducer<>(properties);

        //生产数据到kafka:send
        //生产的数据对象:ProducerRecord类型

        for (int i = 0; i < 10; i++){
            //方式一:指定topic,key,value             --根据Key的编码取余分区个数
//            producer.send(new ProducerRecord<String, String>("bigdata01", i+"", "fafa"+i));
            //方式二:指定topic,value                 --全部写入某一个分区
//            producer.send(new ProducerRecord<String,String>("bigdata01","hehe"+i));
            //方式三:指定topic,partition,key,value  -- 指定写入分区
            producer.send(new ProducerRecord<String,String>("bigdata01",0,i+"","haha"+i));
        }

        //关闭生产者
        producer.close();
    }
}

消费者API

package com.aa;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaClientConsumerTest {
    public static void main(String[] args) {
        //构建配置管理对象的实例
        Properties properties = new Properties();
        //指定Kafka集群的地址
        properties.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");

        //指定消费者属于哪个组
        properties.setProperty("group.id", "test01");
        //自动提交
        properties.setProperty("enable.auto.commit", "true");
        //自动提交的时间间隔
        properties.setProperty("auto.commit.interval.ms", "1000");
        //指定KV的反序列化的类
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //构建消费者对象,加载消费配置
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅Topic
        consumer.subscribe(Arrays.asList("bigdata01"));
        //从Kafka中拉取Topic的数据
        while (true) {
            //实时不断从Kafka中拉取数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //取出本次拉取的每一条数据
            for (ConsumerRecord<String, String> record : records){
                //获取topic
                String topic = record.topic();
                //获取分区
                int part = record.partition();
                //获取offset
                long offset = record.offset();
                //获取KV
                String key = record.key();
                String value = record.value();
                //处理
                System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);
            }
        }
    }
}

测试

遇到了报错:
Error:java: Compilation failed: internal java compiler error。。。

pom.xml需要配置:

<repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <dependencies>
        <!-- Kafka的依赖 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

锁定版本JDK1.8后终于能运行了:
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐