文章目录

kafka是大数据,大并发的杀手锏
kafka + zookeeper

kafka官网:https://kafka.apache.org/

一、为什么使用消息队列

把同步变成异步操作。

1.1使用同步的通信方式来解决多个服务之间的通信

在这里插入图片描述

同步存在的问题:

  • 造成响应时间较长,用户体验差
  • 可能中间某一个环节失败,导致整体都失败了

1.2使用异步的通信方式来解决多个服务之间的通信

在业务的上游(下单)和下游(积分,数据库,优惠券等)之间建立一个消息队列;

在这里插入图片描述

异步的优势:

  • 明显提升接口的吞吐量(响应速度等)
  • 下游某一个服务失败,也可以通过分布式事务来保证操作成功

针对同步的通信方式来说,异步的方法,可以让上游快速成功,极大的提高了系统的吞吐量。而且在分布式系统中,通过下游多个服务的分布式事务,也能保障业务执行之后的最终一致性。

二、消息队列的基本信息

MQ(MESSAGE QUEUE)

消息队列解决的是通信问题,使用的是生产者/消费者模型。

2.1消息队列的流派

rabbitMQ,rocketMQ, kafka,zeroMQ等等消息队列中间件,分成了有无Broker(进行消息分发进队列的功能)的MQ

kafka 是全球消息处理性能最快的一款MQ

2.1.1有broker的mq

通过有一台服务器作为broker,所有的消息都通过他来中转。生产者吧消息发送给它就结束了自己的任务,broker把消息给队里,再有队列推送给消费者(或者消费者主动轮询获得消息)
rabbitMQ、 kafka、rocketMQ

2.1.2无broker的mq

zeroMQ

在这里插入图片描述

三、kafka

3.1介绍

是一个分布式、支持分区(partition)、多副本(replica),给予zookeeper协调的分布式消息系统。
可以实现:

  • 基于zookeeper的批处理系统
  • 低延迟的实时系统
  • storm/Spark流水处理引擎
  • web/nginx日志
  • 访问日志
  • 消息服务等功能

使用场景:
在这里插入图片描述

3.2kafaka的安装

前提需要有jdk,以及部署一台zookeeper的服务器

kafka官网:https://kafka.apache.org/
可以下载2.4.1的版本使用

1.在usr/local中创建一个文件夹

在这里插入图片描述
2.吧下载的kafka安装包放进文件夹中,然后解压

在这里插入图片描述
3.删除压缩包

在这里插入图片描述
4.加压成功后,进入文件夹中查看

在这里插入图片描述

5.在config里面关注一下这个文件 server.properties在这里插入图片描述

vim server.properties查看配置

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
6. 启动kafka
进入到bin的文件夹里面,然后输入:
kafka-server-start.sh -daemon …/config/server.properties

表示:带着配置文件启动kafka

  1. 查看启动成功(前提安装了jdk)

进程:ps -aux | grep server.properties

  1. 再去zookeeper服务器那边查看是否有kafka的服务
    .s/ 命令里面有一个brokers
    .s /brokers/ids 查看id为几的节点kafka以及上线

3.3kafka中的一些基本概念

在这里插入图片描述

kafka创建的topic主题是存储在zookeeper上的。
消息才是保存在kafka上的usr/local/kafka/data/kafka-logs/test-0/0000000.log(日志路径/topic-id中)文件里面

3.4 消息的偏移量及顺序消费原理

在这里插入图片描述

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中(消息才是保存在kafka上的usr/local/kafka/data/kafka-logs/test-0/0000000.log(日志路径/topic-id中)文件里面)
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时也是通过offset来描述当前要消费的消息的位置

3.5单播消息和多播消息

3.5.1单播消息

问:在一个kafka的topic中,启动两个消费者,那生产者发送消息,是否同时被这这两消费者消息吗?
答:如果在同一个消费组,name只有一个消费组可以收到消息;换言之,同一个消费组中 一个topic中的消息 只能有一个消费者收到

目的:为了保证消费的顺序

在这里插入图片描述

3.5.2多播消息

不同的消费组订阅同一个topic,那么每个消费组中只能有一个消费者收到消息;

在这里插入图片描述

单,多播 图解:
在这里插入图片描述

3.6 查看消费组的详细信息

在这里插入图片描述

3.7主体、分区的概念

3.7.1主体topic

是一个逻辑的概念,kafka通过topic将消息进行分类。不同的topic会被订阅该topic的消费组消费。

但是有一个问题,假如这个topic中的消息非常的多,多到需要几T存储,因为消息会被保存在log日志文件中的,此时,为了解决文件过大的问题,kafka提出了partition分区的概念,

3.7.2分区partition

分区的概念

在这里插入图片描述
通过partition将一个topic中的消息分区存储,这样的好处有多个

  • 分区存储,可以解决统一存储文件过大的问题
  • 提供了读写的吞吐量,读和写可以挺尸在多个分区进行
创建多分区的主题

在这里插入图片描述

3.8 kafka中消息日志文件中保存的内容

  • v00000.log:这个文件中保存的就是消息
  • _consumer_offset-49:
    kafka内部自己创建了_consimer_offsets主题包含了50个分区,这个主题用来存放消费者消费某个主题的偏移量(告诉消息消费到哪一条了)

在这里插入图片描述
举例:假如消费者1在消费第50个消息的时候挂了,然后消费者2继续消费是从51开始的,原理如上图:会在_consimer_offsets里面存储消费的offset(偏移量信息) + _consimer_offsets默认有50个分区。

在这里插入图片描述

文件中保存的消息,默认保存7天,7天后会被删除。

3.9 kafka集群及副本的概念

3.9.1.搭建kafka集群,3个broker

准备3个server.properties⽂件

每个⽂件中的这些内容要调整

  • server.properties
broker.id=0
listeners=PLAINTEXT://192.168.65.60:9092
log.dir=/usr/local/data/kafka-logs
  • server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.65.60:9093
log.dir=/usr/local/data/kafka-logs-1
  • server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.65.60:9094
log.dir=/usr/local/data/kafka-logs-2

使⽤如下命令来启动3台服务器

./kafka-server-start.sh -daemon ../config/server0.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties

搭建完后通过查看zk中的/brokers/ids 看是否启动成功

3.9.2 副本的概念

副本是对分区的备份。在集群中,不同的副本会被部署在不同的briker上。

副本是为了主题中的分区创建多个备份,多个副本子啊kafka集群的偶的个broker中,会有一个副本作为leader,其他都是follower

下面例子:创建一个主题,两个分区,三个副本。

注意:localhost是zk的ip地址,输入命令创建和查看的命令

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 3 --partitions 2 --topic my-replicated-topic

在这里插入图片描述

通过查看主题信息,其中的关键数据:

  • replicas:
    当前副本存在的broker节点

  • leader:副本⾥的概念
    每个partition都有⼀个broker作为leader。
    消息发送⽅要把消息发给哪个broker?就看副本的leader是在哪个broker上⾯。副本⾥的leader专⻔⽤来接收消息。
    接收到消息,其他follower通过poll的⽅式来同步数据。
    消费者也是消费leader的broker的消息

  • follower:leader处理所有针对这个partition的读写请求,⽽follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果leader所在的broker挂掉,那么就会进⾏新leader的选举,⾄于怎么选,在之后的controller的概念中介绍。

  • isr:可以同步的broker节点和已同步的broker节点,存放在isr集合中;如果isr中的节点服务器性能差,会被踢出isr集合。

在这里插入图片描述

broker、topic主题。partition分区。replication副本之间的关系是如何的呢?

集群中有多个broker,创建主题时可以致命主题的分区数(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker里.

四、kafka的集群使用

4.1命令

1)向集群发送消息:

在这里插入图片描述

2)从集群中消费消息:
在这里插入图片描述

3)指定消费组消费消息

在这里插入图片描述

4)分区分消息组的集群消费中的细节

在这里插入图片描述
图中kafka集群有两个broker,每个broker中有多个partition。一个partition只能被一个消费组中的某一个消费组消费,从而保证消费顺序。 但是,卡发卡只在prtition的范围内保证消息消费的局部顺序,不能再不同的partition内保证总的消费顺序。
一个消费组可以消费多个partition
消费组中消费者的数量不能比一个topic中的partition数量多,否则多出的消费组消费不到消息的(由于单边播放原则)

4.2 代码的使用

4.2.1 客户端(生产者)的实现

4.2.1.1 生产者的基本实现

1.依赖

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.4.1</version>
</dependency>

2.具体实现

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MySimpleProducer {

	private final static String TOPIC_NAME = "my-replicated-topic";
	
	public static void main(String[] args) throws ExecutionException,InterruptedException {
		//1.设置参数,存放键值对的,相当于map
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
		
		//把发送的key从字符串序列化为字节数组
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
		
		//把发送消息value从字符串序列化为字节数组
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
		
		//2.创建⽣产消息的客户端,传⼊参数
		Producer<String,String> producer = new KafkaProducer<String, String>(props);
		
		//3.创建消息
		// 参数一:主题名, 参数二: key,作⽤是决定了往哪个分区上发,参数三:value,具体要发送的消息内容
		// 具体的发送分区就算公司:hash(key)%partitionNum , 或者直接
		ProducerRecord<String,String> producerRecord = new ProducerRecord<> (TOPIC_NAME,"mykeyvalue","hellokafka");
		// new ProducerRecord<> (TOPIC_NAME, 0 ,"mykeyvalue","hellokafka"); 也可以指定分区id


		//4.发送消息,得到消息发送的元数据并输出
		RecordMetadata metadata = producer.send(producerRecord).get();
		
		System.out.println("同步⽅式发送消息结果:"+"topic-"+metadata.topic() +"|partition-"
			+ metadata.partition()+"|offset-"+metadata.offset());
		producer.close();
	}
}
4.2.1.2 生产者的同步发送消息

在这里插入图片描述
生产者同步发送消息,在收到kafka的ack告知发送成功之前一直处于阻塞状态;

通过配置我们知道:阻塞3S的时间,如果还没有收到ack确定,会重试3次,还没有就报错了抛InterruptedException异常,我们可以捕获处理

// 发送消息,得到消息发送的元数据并输出
		RecordMetadata metadata = producer.send(producerRecord).get();  // 同步发送

在这里插入图片描述

4.2.1.3 生产者的异步发送消息

在这里插入图片描述
不需要等待kafka返回一个ack信息给生产者,直接默认成功,发送完消息后就可以直接执行后面的业务,kafka的breker在收到消息后异步调用生产者提供的callback回调方法。

异步发送 是指使用 异步回调方式 发送消息

在这里插入图片描述

问题:那发送消息使用异步多还是同步多? – 同步

虽然异步会提升性能,但是不能保证消息是否不会丢失,且发送消息用异步性能提升不明显,所以使用同步发送消息较多。

4.2.1.4 生产者中ack的配置

同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有三个配置:

  • ack= 0, kafka-clister不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢失消息,但是效率最高。
  • ack= 1(默认),多副本之间的leader已经收到消息,并把消息写入到本地的log中(00000.log),才会返回ack给生产者,性能和安全性是最均衡的。
  • ack = -1/all,里面的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要 leader写入本地log中和一个follower同步完成后,才会返回ack给生产者(此时集群中有2【前面设置的min.insync.replicas】个broker已完成数据的接收),这种方式最安全,但是性能最差。

下⾯是关于ack和重试(如果没有收到ack,就开启重试)的配置

props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack = 1
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,
⽐如⽹络抖动,所以需要在接收者那边做好消息接收的**幂等性处理**
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);  // 重试次数

//重试间隔设置,ms
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); 

重试发送消息,此时可能会有重复发送的问题,解决方法见下;

4.2.1.5 关于消息发送的缓冲区

消息不是一条一条的发送到kafka;
消息从生产者到kafka之间有缓存区;

在这里插入图片描述

代码:
在这里插入图片描述

4.2.2 客户端(消费者)的实现

4.2.2.1 基本的消费组代码实现
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.4.1</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MySimpleConsumer {

	private final static String TOPIC_NAME= "my-replicated-topic";
	private final static String CONSUMER_GROUP_NAME ="testGroup";
	
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
		
		//消费分组名,及其序列化
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
		
		//1.创建⼀个消费者的客户端
		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		//2.消费者订阅主题列表
		consumer.subscribe(Arrays.asList(TOPIC_NAME));
		while (true) {
			/*
			* 3.poll()   API是拉取消息的⻓轮询
			*/
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
			for (ConsumerRecord<String, String> record : records) { // 消费poll下来的消息
				//4.打印消息
				System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
			}
		}
	}
}

4.2.2.2 关于消费者自动提交和手动提交offset偏移量

1)提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。

在这里插入图片描述

2)自动提交
消费组poll消息下来以后就会自动提交offset

// 是否⾃动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//⾃动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COM`在这里插入代码片`MIT_INTERVAL_MS_CONFIG, "1000");

注意:自动提交会丢消息。因为消费者在消费之前提交offset,有可能提交完成后还没有消费时消费者就挂了。

3)手动提交

需要把自动提交的配置改成fasle
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);

手动提交分为两种

  • 手动同步提交(常用)
    在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
    在这里插入图片描述
    consumer.commitSync();此时会阻塞,等ack

  • 手动异步提交
    在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用

在这里插入图片描述
consumer.commitASync();此时不会阻塞

一般使用同步提交就可以了。

4.2.2.3 长轮询poll消息

默认情况下,消费者一次会poll500条消息。

//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

代码中设置了⻓轮询的时间是1000毫秒

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

意味着:

  • 如果⼀次poll到500条,就直接执⾏for循环
  • 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s
  • 如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环

如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让⼀次poll的消息条数少⼀点

//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
4.2.2.4 消费者的健康状态检查

消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。

//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
4.2.2.5 指定分区和偏移量、时间消费
  • 指定分区消费
    在这里插入图片描述

  • 从头消费(回溯消费)
    在这里插入图片描述

  • 指定offset消费
    在这里插入图片描述

  • 指定时间点开始消费
    根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。

在这里插入图片描述

4.2.2.6 新消费组的消费offset偏移量 规则新消费组中的消费者在启动以后,

默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

  • Latest:默认的,消费新消息(只消费启动之后发送到主题的消息)
  • earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

五、springboot中使用kafka

5.1 基本使用

1.引入依赖

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

2.编写配置文件
在这里插入图片描述

  1. 编写消息生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/msg")
public class MyKafkaController {

	private final static String TOPIC_NAME = "my-replicated-topic";

	@Autowired
	private KafkaTemplate<String,String> kafkaTemplate;

	@RequestMapping("/send")
	publicStringsendMessage(){
		kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
		return "send success!";
	}
}
  1. 编写消息消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
publicclassMyConsumer {

	@KafkaListener(topics="my-replicated-topic",groupId="MyGroup1") // 监听主题,属于的消费组
	public void listenGroup(ConsumerRecord<String, String> record,  Acknowledgment ack) {  // record是指一条消息记录
		String value =record.value();
		System.out.println(value);
		System.out.println(record);
		//⼿动提交offset,自动提交,这个ack没有作用。
		ack.acknowledge();
	}
	
	@KafkaListener(topics="my-replicated-topic",groupId="MyGroup2") // 监听主题,属于的消费组
	public void listensGroup(ConsumerRecord<String, String> records,  Acknowledgment ack) {  // record是指一批消息记录
		records进行操作
		//⼿动提交offset
		ack.acknowledge();
	}
}

  1. 拓展 - 消费者中配置消费主题、分区和偏移量
@KafkaListener(groupId="testGroup", topicPartitions= {
	@TopicPartition(topic="topic1", partitions= {"0", "1"}), // topic1主题的0,1分区
	@TopicPartition(topic="topic2", partitions="0", // topic2主题的0 分区
		partitionOffsets=@PartitionOffset(partition="1",initialOffset="100"))}  // 对于任意主题的1号分区,从100偏移量开始消费
, concurrency="3")   //concurrency就是同组下创建的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
	String value = record.value();
	System.out.println(value);
	System.out.println(record);
	//⼿动提交offset
	ack.acknowledge();
}

5.2 kafka集群中controller、Rebalance 、 HW

5.2.1 Controller

  • 集群中谁来充当controller
    每个broker启动时会在zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller(所以很多时候是0的broker当controller)

  • controller的作用:负责管理整个集群中所有分区和副本的状态。

    1. 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leader,选举的规则是从isr集合中最左边获得
    2. 当集群中有broker新增或减少,controller会同步信息给其他的broker
    3. 当集群中有分区新增或减少,controller会同步信息给其他的broker

5.2.2 Rebalance机制

  • 前提:消费组中的消费者没有指明分区来消费
  • 触发条件:当消费组中的消费者和分区的关系发生变化的时候
  • 分区分配策略:在rebalance之前,分区真么分配会有这么三种策略
    1. range:根据公式计算得到某个消费者 消费哪个分区;前面的消费者是 分区总数/消费者数量 + 1,之后的消费者是分区总数/消费者数量
      在这里插入图片描述

    2. 轮询:大家轮着来(消费者1消费p036,消费者2消费p147,消费者3消费25)

    3. sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没开,那么就进行全部的重新分配,建议开启。

5.2.3 HW和LEO

LEO是某个副本最后消息的消息位置(log-end-offset)

HW俗称高水位 ,是已完成同步的位置。消息在写⼊broker时,且每个broker完成这条消息的同步后,hw才会变化。
在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的⽬的是防⽌消息的丢失。

作用:防止重复消费

在这里插入图片描述

六、kafka线上问题优化

6.1 如何防止消息丢失

生产者:使用同步发送消息 + 把ack设置成1或者all + 设置同步的分区数 >=2
消费者:把自动提交改成手动提交

6.2 如何防止重复消费

在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。
此时,生产者会进行重试,于是broker就会收到多条相同的消息,而照成消费者的重复消费。
解决:
- 生产者关闭重试:会造成丢失消息(不建议)
- 消费者解决非幂等性消息问题:
所谓幂等性:多次访问的结果都是一样的。对于rest的请求(get、put、delete幂等,post非幂等)
方案:
1. 在数据库中创建联合主键,防止相同的主键创建出多条记录
2. 使用分布式锁,以业务id为锁,保证只有一条记录能创建成功

在这里插入图片描述

6.3 如何做到消息的顺序消费

  • 生产者:保证消息按顺序消费,且消息不丢失 – 使用同步发送,ack设置成非0的值

  • 消费者:主题只能设置一个分区,消费组中只能有一个消费者(1对1)
    此时,是牺牲了性能。

在这里插入图片描述
在这里插入图片描述

6.4 如何解决消息积压问题

消息积压(生产能力远大于消费能力)会导致很多问题,比如磁盘呗打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:

  • 方案1:在一个消费者中启动多线程,让多个线程同时消费。 ----- 提升消费者的消费能力
  • 方案2:如果方案1还不够,这个时候可以启动多个消费者,多个消费者部署在不同服务器上。
  • 方案3: 让一个消费者去吧收到的消息往另一个topic上发,另一个topic设置多个分区和多个消费者,进行具体的业务消费

6.5 延时队列的实现

延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单

  • 创建多个topic每个topic标识延时的间隔
    1.topic_5s:延时5s执行的队列
    2.topic_1m:
    3.topic_30m:
  • 消息发送者发送消息到相应的topic,并带上消息的发送时间
  • 消费者订阅相应的topic,消费是轮询消费整个topic中的信息
    1.如果消息的发送时间,和消费的当前时间超过预设的值,比如30分钟
    2.如果消息的发送时间,和消费的当前时间没超过预设的值,则不消费当前的offset及之后的offset的所有消息都消费
    3.下次继续消费改offset出的消息,判断时间是否以满足预设值

在这里插入图片描述

在这里插入图片描述

七、kafka-eagle 监控平台

1.去kafka-eagle官网下载压缩包 : https://www.kafka-eagle.org/

2.分配一台虚拟机,安装jdk

3.解压缩第一步的压缩包

在这里插入图片描述

4.给kafka-eagle配置环境变量,修改zk和数据库的地址

在这里插入图片描述
在这里插入图片描述
启动查看
在这里插入图片描述
看到

在这里插入图片描述

请不要吝啬你发财的小手,点赞收藏评论,谢谢!

请不要吝啬你发财的小手,点赞收藏评论,谢谢!

请不要吝啬你发财的小手,点赞收藏评论,谢谢!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐