消息队列 - kafka
kafka是大数据,大并发的杀手锏kafka+ zookeeperkafka官网:https://kafka.apache.org/一、为什么使用消息队列把同步变成异步操作。使用同步的通信方式来解决多个服务之间的通信同步存在的问题:造成响应时间较长,用户体验差可能中间某一个环节失败,导致整体都失败了使用异步的通信方式来解决多个服务之间的通信在业务的上游(下单)和下游(积分,数据库,优惠券等)之间建
文章目录
kafka是大数据,大并发的杀手锏
kafka + zookeeper
kafka官网:https://kafka.apache.org/
一、为什么使用消息队列
把同步变成异步操作。
1.1使用同步的通信方式来解决多个服务之间的通信
同步存在的问题:
- 造成响应时间较长,用户体验差
- 可能中间某一个环节失败,导致整体都失败了
1.2使用异步的通信方式来解决多个服务之间的通信
在业务的上游(下单)和下游(积分,数据库,优惠券等)之间建立一个消息队列;
异步的优势:
- 明显提升接口的吞吐量(响应速度等)
- 下游某一个服务失败,也可以通过分布式事务来保证操作成功
针对同步的通信方式来说,异步的方法,可以让上游快速成功,极大的提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务,也能保障业务执行之后的最终一致性。
二、消息队列的基本信息
MQ(MESSAGE QUEUE)
消息队列解决的是通信问题,使用的是生产者/消费者模型。
2.1消息队列的流派
rabbitMQ,rocketMQ, kafka,zeroMQ等等消息队列中间件,分成了有无Broker(进行消息分发进队列的功能)的MQ
kafka 是全球消息处理性能最快的一款MQ
2.1.1有broker的mq
通过有一台服务器作为broker,所有的消息都通过他来中转。生产者吧消息发送给它就结束了自己的任务,broker把消息给队里,再有队列推送给消费者(或者消费者主动轮询获得消息)
2.1.2无broker的mq
zeroMQ
三、kafka
3.1介绍
是一个分布式、支持分区(partition)、多副本(replica),给予zookeeper协调的分布式消息系统。
可以实现:
- 基于zookeeper的批处理系统
- 低延迟的实时系统
- storm/Spark流水处理引擎
- web/nginx日志
- 访问日志
- 消息服务等功能
使用场景:
3.2kafaka的安装
前提需要有jdk,以及部署一台zookeeper的服务器
kafka官网:https://kafka.apache.org/
可以下载2.4.1的版本使用
1.在usr/local中创建一个文件夹
2.吧下载的kafka安装包放进文件夹中,然后解压
3.删除压缩包
4.加压成功后,进入文件夹中查看
5.在config里面关注一下这个文件 server.properties
vim server.properties查看配置
6. 启动kafka
进入到bin的文件夹里面,然后输入:
kafka-server-start.sh -daemon …/config/server.properties
表示:带着配置文件启动kafka
- 查看启动成功(前提安装了jdk)
进程:ps -aux | grep server.properties
- 再去zookeeper服务器那边查看是否有kafka的服务
.s/ 命令里面有一个brokers
.s /brokers/ids 查看id为几的节点kafka以及上线
3.3kafka中的一些基本概念
kafka创建的topic主题是存储在zookeeper上的。
消息才是保存在kafka上的usr/local/kafka/data/kafka-logs/test-0/0000000.log(日志路径/topic-id中)文件里面
3.4 消息的偏移量及顺序消费原理
- 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中(消息才是保存在kafka上的usr/local/kafka/data/kafka-logs/test-0/0000000.log(日志路径/topic-id中)文件里面)
- 消息的保存是有序的,通过offset偏移量来描述消息的有序性
- 消费者消费消息时也是通过offset来描述当前要消费的消息的位置
3.5单播消息和多播消息
3.5.1单播消息
问:在一个kafka的topic中,启动两个消费者,那生产者发送消息,是否同时被这这两消费者消息吗?
答:如果在同一个消费组,name只有一个消费组可以收到消息;换言之,同一个消费组中 一个topic中的消息 只能有一个消费者收到
目的:为了保证消费的顺序
3.5.2多播消息
不同的消费组订阅同一个topic,那么每个消费组中只能有一个消费者收到消息;
单,多播 图解:
3.6 查看消费组的详细信息
3.7主体、分区的概念
3.7.1主体topic
是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费组消费。
但是有一个问题,假如这个topic中的消息非常的多,多到需要几T存储,因为消息会被保存在log日志文件中的,此时,为了解决文件过大的问题,kafka提出了partition分区的概念,
3.7.2分区partition
分区的概念
通过partition将一个topic中的消息分区存储,这样的好处有多个
- 分区存储,可以解决统一存储文件过大的问题
- 提供了读写的吞吐量,读和写可以挺尸在多个分区进行
创建多分区的主题
3.8 kafka中消息日志文件中保存的内容
- v00000.log:这个文件中保存的就是消息
- _consumer_offset-49:
kafka内部自己创建了_consimer_offsets主题包含了50个分区,这个主题用来存放消费者消费某个主题的偏移量(告诉消息消费到哪一条了)
举例:假如消费者1在消费第50个消息的时候挂了,然后消费者2继续消费是从51开始的,原理如上图:会在_consimer_offsets里面存储消费的offset(偏移量信息) + _consimer_offsets默认有50个分区。
文件中保存的消息,默认保存7天,7天后会被删除。
3.9 kafka集群及副本的概念
3.9.1.搭建kafka集群,3个broker
准备3个server.properties⽂件
每个⽂件中的这些内容要调整
- server.properties
broker.id=0
listeners=PLAINTEXT://192.168.65.60:9092
log.dir=/usr/local/data/kafka-logs
- server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.65.60:9093
log.dir=/usr/local/data/kafka-logs-1
- server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.65.60:9094
log.dir=/usr/local/data/kafka-logs-2
使⽤如下命令来启动3台服务器
./kafka-server-start.sh -daemon ../config/server0.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
搭建完后通过查看zk中的/brokers/ids 看是否启动成功
3.9.2 副本的概念
副本是对分区的备份。在集群中,不同的副本会被部署在不同的briker上。
副本是为了主题中的分区创建多个备份,多个副本子啊kafka集群的偶的个broker中,会有一个副本作为leader,其他都是follower
下面例子:创建一个主题,两个分区,三个副本。
注意:localhost是zk的ip地址,输入命令创建和查看的命令
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 3 --partitions 2 --topic my-replicated-topic
通过查看主题信息,其中的关键数据:
-
replicas:
当前副本存在的broker节点 -
leader:副本⾥的概念
每个partition都有⼀个broker作为leader。
消息发送⽅要把消息发给哪个broker?就看副本的leader是在哪个broker上⾯。副本⾥的leader专⻔⽤来接收消息。
接收到消息,其他follower通过poll的⽅式来同步数据。
消费者也是消费leader的broker的消息 -
follower:leader处理所有针对这个partition的读写请求,⽽follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果leader所在的broker挂掉,那么就会进⾏新leader的选举,⾄于怎么选,在之后的controller的概念中介绍。
-
isr:可以同步的broker节点和已同步的broker节点,存放在isr集合中;如果isr中的节点服务器性能差,会被踢出isr集合。
broker、topic主题。partition分区。replication副本之间的关系是如何的呢?
集群中有多个broker,创建主题时可以致命主题的分区数(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里.
四、kafka的集群使用
4.1命令
1)向集群发送消息:
2)从集群中消费消息:
3)指定消费组消费消息
4)分区分消息组的集群消费中的细节
图中kafka集群有两个broker,每个broker中有多个partition。一个partition只能被一个消费组中的某一个消费组消费,从而保证消费顺序。 但是,卡发卡只在prtition的范围内保证消息消费的局部顺序,不能再不同的partition内保证总的消费顺序。
一个消费组可以消费多个partition
消费组中消费者的数量不能比一个topic中的partition数量多,否则多出的消费组消费不到消息的(由于单边播放原则)
4.2 代码的使用
4.2.1 客户端(生产者)的实现
4.2.1.1 生产者的基本实现
1.依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.具体实现
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException,InterruptedException {
//1.设置参数,存放键值对的,相当于map
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//2.创建⽣产消息的客户端,传⼊参数
Producer<String,String> producer = new KafkaProducer<String, String>(props);
//3.创建消息
// 参数一:主题名, 参数二: key,作⽤是决定了往哪个分区上发,参数三:value,具体要发送的消息内容
// 具体的发送分区就算公司:hash(key)%partitionNum , 或者直接
ProducerRecord<String,String> producerRecord = new ProducerRecord<> (TOPIC_NAME,"mykeyvalue","hellokafka");
// new ProducerRecord<> (TOPIC_NAME, 0 ,"mykeyvalue","hellokafka"); 也可以指定分区id
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:"+"topic-"+metadata.topic() +"|partition-"
+ metadata.partition()+"|offset-"+metadata.offset());
producer.close();
}
}
4.2.1.2 生产者的同步发送消息
生产者同步发送消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态;
通过配置我们知道:阻塞3S的时间,如果还没有收到ack确定,会重试3次,还没有就报错了抛InterruptedException异常,我们可以捕获处理
// 发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get(); // 同步发送
4.2.1.3 生产者的异步发送消息
不需要等待kafka返回一个ack信息给生产者,直接默认成功,发送完消息后就可以直接执行后面的业务,kafka的breker在收到消息后异步调用生产者提供的callback回调方法。
异步发送 是指使用 异步回调方式 发送消息
问题:那发送消息使用异步多还是同步多? – 同步
虽然异步会提升性能,但是不能保证消息是否不会丢失,且发送消息用异步性能提升不明显,所以使用同步发送消息较多。
4.2.1.4 生产者中ack的配置
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有三个配置:
- ack= 0, kafka-clister不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢失消息,但是效率最高。
- ack= 1(默认),多副本之间的leader已经收到消息,并把消息写入到本地的log中(00000.log),才会返回ack给生产者,性能和安全性是最均衡的。
- ack = -1/all,里面的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader写入本地log中和一个follower同步完成后,才会返回ack给生产者(此时集群中有2【前面设置的min.insync.replicas】个broker已完成数据的接收),这种方式最安全,但是性能最差。
下⾯是关于ack和重试(如果没有收到ack,就开启重试)的配置
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack = 1
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,
⽐如⽹络抖动,所以需要在接收者那边做好消息接收的**幂等性处理**
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
//重试间隔设置,ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
重试发送消息,此时可能会有重复发送的问题,解决方法见下;
4.2.1.5 关于消息发送的缓冲区
消息不是一条一条的发送到kafka;
消息从生产者到kafka之间有缓存区;
代码:
4.2.2 客户端(消费者)的实现
4.2.2.1 基本的消费组代码实现
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MySimpleConsumer {
private final static String TOPIC_NAME= "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME ="testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
//消费分组名,及其序列化
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//1.创建⼀个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2.消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3.poll() API是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { // 消费poll下来的消息
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
4.2.2.2 关于消费者自动提交和手动提交offset偏移量
1)提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。
2)自动提交
消费组poll消息下来以后就会自动提交offset
// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COM`在这里插入代码片`MIT_INTERVAL_MS_CONFIG, "1000");
注意:自动提交会丢消息。因为消费者在消费之前提交offset,有可能提交完成后还没有消费时消费者就挂了。
3)手动提交
需要把自动提交的配置改成fasle
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);
手动提交分为两种
-
手动同步提交(常用)
在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
consumer.commitSync();此时会阻塞,等ack -
手动异步提交
在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
consumer.commitASync();此时不会阻塞
一般使用同步提交就可以了。
4.2.2.3 长轮询poll消息
默认情况下,消费者一次会poll500条消息。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
代码中设置了⻓轮询的时间是1000毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
意味着:
- 如果⼀次poll到500条,就直接执⾏for循环
- 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s
- 如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环
如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让⼀次poll的消息条数少⼀点
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
4.2.2.4 消费者的健康状态检查
消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
4.2.2.5 指定分区和偏移量、时间消费
-
指定分区消费
-
从头消费(回溯消费)
-
指定offset消费
-
指定时间点开始消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。
4.2.2.6 新消费组的消费offset偏移量 规则新消费组中的消费者在启动以后,
默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
- Latest:默认的,消费新消息(只消费启动之后发送到主题的消息)
- earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
五、springboot中使用kafka
5.1 基本使用
1.引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.编写配置文件
- 编写消息生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/msg")
public class MyKafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
publicStringsendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
return "send success!";
}
}
- 编写消息消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
publicclassMyConsumer {
@KafkaListener(topics="my-replicated-topic",groupId="MyGroup1") // 监听主题,属于的消费组
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { // record是指一条消息记录
String value =record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset,自动提交,这个ack没有作用。
ack.acknowledge();
}
@KafkaListener(topics="my-replicated-topic",groupId="MyGroup2") // 监听主题,属于的消费组
public void listensGroup(ConsumerRecord<String, String> records, Acknowledgment ack) { // record是指一批消息记录
records进行操作
//⼿动提交offset
ack.acknowledge();
}
}
- 拓展 - 消费者中配置消费主题、分区和偏移量
@KafkaListener(groupId="testGroup", topicPartitions= {
@TopicPartition(topic="topic1", partitions= {"0", "1"}), // topic1主题的0,1分区
@TopicPartition(topic="topic2", partitions="0", // topic2主题的0 分区
partitionOffsets=@PartitionOffset(partition="1",initialOffset="100"))} // 对于任意主题的1号分区,从100偏移量开始消费
, concurrency="3") //concurrency就是同组下创建的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}
5.2 kafka集群中controller、Rebalance 、 HW
5.2.1 Controller
-
集群中谁来充当controller
每个broker启动时会在zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller(所以很多时候是0的broker当controller) -
controller的作用:负责管理整个集群中所有分区和副本的状态。
- 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leader,选举的规则是从isr集合中最左边获得
- 当集群中有broker新增或减少,controller会同步信息给其他的broker
- 当集群中有分区新增或减少,controller会同步信息给其他的broker
5.2.2 Rebalance机制
- 前提:消费组中的消费者没有指明分区来消费
- 触发条件:当消费组中的消费者和分区的关系发生变化的时候
- 分区分配策略:在rebalance之前,分区真么分配会有这么三种策略
-
range:根据公式计算得到某个消费者 消费哪个分区;前面的消费者是 分区总数/消费者数量 + 1,之后的消费者是分区总数/消费者数量
-
轮询:大家轮着来(消费者1消费p036,消费者2消费p147,消费者3消费25)
-
sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没开,那么就进行全部的重新分配,建议开启。
-
5.2.3 HW和LEO
LEO是某个副本最后消息的消息位置(log-end-offset)
HW俗称高水位 ,是已完成同步的位置。消息在写⼊broker时,且每个broker完成这条消息的同步后,hw才会变化。
在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的⽬的是防⽌消息的丢失。
作用:防止重复消费
六、kafka线上问题优化
6.1 如何防止消息丢失
生产者:使用同步发送消息 + 把ack设置成1或者all + 设置同步的分区数 >=2
消费者:把自动提交改成手动提交
6.2 如何防止重复消费
在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。
此时,生产者会进行重试,于是broker就会收到多条相同的消息,而照成消费者的重复消费。
解决:
- 生产者关闭重试:会造成丢失消息(不建议)
- 消费者解决非幂等性消息问题:
所谓幂等性:多次访问的结果都是一样的。对于rest的请求(get、put、delete幂等,post非幂等)
方案:
1. 在数据库中创建联合主键,防止相同的主键创建出多条记录
2. 使用分布式锁,以业务id为锁,保证只有一条记录能创建成功
6.3 如何做到消息的顺序消费
-
生产者:保证消息按顺序消费,且消息不丢失 – 使用同步发送,ack设置成非0的值
-
消费者:主题只能设置一个分区,消费组中只能有一个消费者(1对1)
此时,是牺牲了性能。
6.4 如何解决消息积压问题
消息积压(生产能力远大于消费能力)会导致很多问题,比如磁盘呗打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:
- 方案1:在一个消费者中启动多线程,让多个线程同时消费。 ----- 提升消费者的消费能力
- 方案2:如果方案1还不够,这个时候可以启动多个消费者,多个消费者部署在不同服务器上。
- 方案3: 让一个消费者去吧收到的消息往另一个topic上发,另一个topic设置多个分区和多个消费者,进行具体的业务消费
6.5 延时队列的实现
延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单
- 创建多个topic每个topic标识延时的间隔
1.topic_5s:延时5s执行的队列
2.topic_1m:
3.topic_30m: - 消息发送者发送消息到相应的topic,并带上消息的发送时间
- 消费者订阅相应的topic,消费是轮询消费整个topic中的信息
1.如果消息的发送时间,和消费的当前时间超过预设的值,比如30分钟
2.如果消息的发送时间,和消费的当前时间没超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费
3.下次继续消费改offset出的消息,判断时间是否以满足预设值
七、kafka-eagle 监控平台
1.去kafka-eagle官网下载压缩包 : https://www.kafka-eagle.org/
2.分配一台虚拟机,安装jdk
3.解压缩第一步的压缩包
4.给kafka-eagle配置环境变量,修改zk和数据库的地址
启动查看
看到
请不要吝啬你发财的小手,点赞收藏评论,谢谢!
请不要吝啬你发财的小手,点赞收藏评论,谢谢!
请不要吝啬你发财的小手,点赞收藏评论,谢谢!
更多推荐
所有评论(0)