两个系统各自都有各自要去做的事,所以只能将消息放到一个中间平台(中间件)

Kafka

分布式流媒体平台
在这里插入图片描述
程序发消息,程序接收消息
在这里插入图片描述
Producer:Producer即生产者,消息的产生者,是消息的入口。
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
同一个组下,订阅的主题只能有一个消费者收到消息(一对一)
放到不同 的组下,就能实现一对多
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
Kafka对于zookeeper是清以来,保存kafka相关节点数据,管理节点。安装Kafka前需先安装zookeeper

1.Kafka生产者和消费者详解

Kafka设计与原理详解

生产者

在这里插入图片描述

消费者

在这里插入图片描述
在这里插入图片描述
谁增加while 让它一直处于监听状态

2.分区机制

加你个主题划分为多个分区(Partition)
可以处理更多的消息,不受单服务器限制,可以不受限的处理更多数据。
消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
在这里插入图片描述
我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值
Kafka需要维持的元数据只有一个–消费消息在Partition中的offset值(偏移量-连续自增的数值),Consumer每消费一个消息,offset就会加1。其实消息的状态完全是由Consumer控制的,Consumer可以跟踪和重设这个offset值,这样的话Consumer就可以读取任意位置的消息。
把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了**;第二就是可以提高并发**,因为可以以Partition为单位读写了。

3.Kafka高可用机制

集群

在这里插入图片描述

备份

有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。
在这里插入图片描述
Kafka 提供了一种基于副本(Replication)的备份机制,以确保数据的高可用性和容错能力。以下是 Kafka 备份机制的详细说明:

  • 1.副本(Replicas)与分区(Partitions)
    在 Kafka 中,主题(Topic)被划分为多个分区(Partition),每个分区都有多个副本。Leader 副本负责处理所有对该分区的读写请求,而Follower 副本则从 Leader 副本同步数据。这样,即使某个 Broker(即 Leader 副本所在节点)发生故障,其他 Broker 上的 Follower 副本可以迅速晋升为新的 Leader,继续提供服务。

  • 2.副本分配与复制
    副本分配:Kafka 使用 ZooKeeper 管理元数据,包括分区与副本的分配信息。在创建主题时,可以指定每个分区的副本数(通常称为副本因子)。Kafka 会根据 Broker 配置和可用性,将分区的副本均匀地分布到不同的 Broker 上,以实现负载均衡和容错。

数据复制:Producer 发送消息到 Leader 副本。Leader 副本将消息写入其本地日志后,立即将消息发送给所有 Follower 副本。Follower 副本接收到消息后,将其写入本地日志。这种同步复制或异步复制(取决于配置)机制确保了数据在集群中的复制。

  • 3.ISR(In-Sync Replicas)与副本同步
    ISR:Kafka 维护了一个名为 ISR(In-Sync Replicas)的集合,包含所有与 Leader 副本保持同步的 Follower 副本。只有 I**SR 中的副本被认为是可以安全地晋升为 Leader 的候选者。**当 Follower 副本由于网络延迟、Broker 故障等原因与 Leader 副本失去同步时,会被暂时移出 ISR。

副本同步:Kafka 通过心跳机制监控 Follower 副本与 Leader 副本的同步状态。Follower 副本定期向 Leader 副本发送心跳,报告其已复制的消息偏移量。Leader 副本根据心跳信息判断 Follower 副本是否处于同步状态,并据此更新 ISR 集合。

**4. Leader 选举与故障恢复
Leader 选举:当 Leader 副本所在的 Broker 发生故障时,ZooKeeper 会检测到并触发 Leader 选举。从 I
SR 集合中选择一个 Follower 副本晋升为新的 Leader。**其余 Follower 副本随后将与新的 Leader 建立连接并开始同步。

故障恢复:一旦新的 Leader 副本被选举出来,Producer 和 Consumer 可以无缝地切换到新的 Leader 进行读写操作。对于未完成同步的 Follower 副本,它们将在恢复连接后从新的 Leader 处拉取缺失的数据,直至重新加入 ISR。

4.消息可靠性

在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况:

  • 一个消息发送失败
  • 一个消息被发送多次
  • 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次

有许多系统声称它们实现了exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息,但是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理取过来的消息时失败了。
(1)从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。
(2)从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。

5.生产者详解

发送类型

在这里插入图片描述
在这里插入图片描述
异步时响应回调函数。
消息发送失败时会抛出异常,这是可以拿到异常,用于记录或做出补偿。

6.消费者详解

在这里插入图片描述
在这里插入图片描述
只有在一个分区,才能保证消息有序。

手动提示偏移量

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
同步提交尽可能会造成方法堵塞。在这里插入图片描述
在这里插入图片描述
如果异步提交成功则会一直在while(true)监听中,如果出现异常才会捕获异常,捕获完异常后才会执行finally中的同步提交操作

7.Spring集成Kafka

在这里插入图片描述

在这里插入图片描述
异步,通过回调方式实现:

//发送消息
public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage());
        }
    });
}
//接收消息
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

一个消费者可以监听多个主题:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring也支持获取一个或多个消息头信息,通过在监听器上是哟个@Header注解:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println("Received Message: " + message" + "from partition: " + partition);
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐