Kafka

 

概述:

Kafka是最初由Linkedin公司开发,用scala语言编写的,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。

它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web日志、消息服务等。

它类似于JMS(Java Messgae System),但是他不是JMS规范的实现。

 

JMS规范的消息系统中消息收发分类:

点对点/队列模式,一般基于拉取或轮询来接收消息,在这种模式中,可以有多个消费者同时在queue(队列)中侦听同一消息,但一条消息只会被一个消费者接收。既支持即发即弃的消息传送方式也支持同步请求/应答的消息传送方式。

发布/订阅模式,既可以支持推送来接收消息,也可以通过拉取或轮询的形式来接收。发布到一个topic(主题)的消息可以被多个订阅者接收,解耦能力更强。

 

Kafka的消息系统组成:

kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker(中介,拉皮条的)。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性,zookeeper保存kafka集群的meta(元数据)信息(broker/consumer)。

 

                   其他消息队列简单类比

RabbitMQ: Erlang编写,支持多协议AMQP,XMPP,SMTP,STOMP。支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式

Redis:基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差。

ZeroMQ:轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高

ActiveMQ:JMS实现,Peer-to-Peer,支持持久化、XA事务

Kafka/Jafka:高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理

MetaQ/RocketMQ:纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务

消息队列适用场景和设计目标:

         场景:

                  解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在

冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险

扩展 消息系统是统一的数据接口,各系统可独立扩展

峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求

可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据

异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理

 

我大概比喻下Kafka最基本的消息队列使用场景

         Kafka是生产者和消费者之间的中间件

在生活中比喻---

妈妈:生产者

你:消费者

馒头:数据流、消息

正常情况下: 生产一个  消费一个(效率低下)

其他情况: 

1)一直生产,你吃到某一个馒头时,你噎死了(机器故障), 馒头就没人要了(数据丢失了)。

2)一直生产,做馒头速度快,你来不及吃完,馒头也就丢失了

为了防止上述“其他情况”的出现,我们可以拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃,而这篮子/框就可以为:Kafka。当篮子满了,馒头就装不下了,咋办? 多准备几个篮子 === Kafka的扩容。

 

设计目标:

         高吞吐率:在廉价的商用机器上单机可支持每秒100万条消息的读写。

        消息持久化:所有消息均被持久化到磁盘,无消息丢失,支持消息重放。

        完全分布式:Producer,Broker,Consumer均支持水平扩展。

        同时适应在线流处理和离线批处理。

 

 

 

 

 

 

 

 

Kafka的架构和关键概念:

         架构:

    分布式(负载均衡提高处理性能和容错能力)

Kafka以集群的方式运行,可以由一个或多个kafka服务(server)组成,每个服务叫做一个broker. 生产者通过网络将消息发送到Kafka集群,集群向消费者提供消息。

每个partition(分区)在Kafka集群的若干服务(server)中都有副本/备份,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。

每个分区有一个leader服务器,若干个followers服务器,leader负责处理消息的读和写,followers则去复制leader,如果leader down了,followers中的一台则会自动成为leader。

 

 

         关键概念:

                   生产者Producer: 将向Kafka topic发布消息的程序成为producers.

                   消费者Consumer:将订阅topics并消费消息的程序成为consumer.

                   中介,拉皮条的broker集群中的每个kafka服务实例。

                   Topic

这是一个逻辑概念,可以理解为消息的主题或分类,每条消息都属于且只属于一个topic,生产者在发送消息时、消费者在接收消息时都必须指定消息的topic。一个topic中可以包含多个partition(分区)。

 

                  

       Partition:

这是一个物理概念,可以理解为一个partition物理上对应一个文件夹。一个partition只分布于一个broker上(不考虑备份的情况下),每个partition都是一个有序队列,分为多个大小相等的segment(对用户透明),每个segment对应一个文件,而segment文件由一条条不可变的记录组成(消息发出后就不可变更了),这里面的数据记录就是生产者发送的消息内容。

删除消息:

注意,消息记录只会被append到segment中,每条消息不会被单独删除或修改。当需要删除过期日志时,直接删除某一个segment。

删除可以通过配置定时删除,或者周期性检测记录文件大小来删除过大文件,比如:1.每两天定时删除每个分区中最老的segment。2.每12小时扫描一次,如果某个分区中的文件大小超过1G,则删除其中最老的一个segment。

文件组织和命名:

Partition在物理上一般以topic名+从0开始的有序序列命名,第一个partiton序号从0开始,序号最大值为partitions数量减1

每条消息有一个连续有序独享的序列号offset来标记消息位置,

segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

 

 

 

概念范围:

kafka集群à单个brokeràtopicàpartitionàsegmentà单条消息记录

 

                  

消息发布模式

对于kafka来说,发布消息通常有两种模式:

队列模式(queuing)和发布-订阅模式(publish-subscribe)

队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;

发布-订阅模式中消息被广播到所有的consumer,Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上

 

如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。

如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。

更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。

 

相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

 

 

 

 

 

 

 

 

 

kafka操作相关

        

消息分区partitioner:可以通过写一个类实现partitioner接口来定义消息分区规则,(配置producer时会引用该类来使用规则)

         举例

HashPartitioner哈希规则

 

RoundRobin轮询调度规则

 

 

Kafka消息发送方式:

         同步Sync 异步Async 单次OneWay

         kafka有同步(sync)、异步(async)以及oneway这三种发送方式,

         同步和异步由配置参数producer.type指定,

         Oneway由配置参数request.require.acks指定。

 

官方文档中的注释:producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

 

以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

异步模式还有四个配置参数:

queue.buffering.max.ms

默认5000

启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。

queue.buffering.max.messages

默认10000

启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。

queue.enqueue.timeout.ms

默认

-1

当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。

batch.num.messages

默认

200

启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)

 

producers可以一步的并行向kafka发送消息,但是通常producer在发送完消息之后会得到一个响应,返回的是offset或者发送过程中遇到的错误。这其中有个非常重要的参数“request.required.acks”,这个参数决定了producer要求leader partition收到确认的副本个数,如果acks设置为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发生成功,这样有可能导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。

若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待知道broker确认收到消息。若设置为

-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

其中,在异步发送方式时将request.required.acks参数值设置为0,即为启用了oneway发送方式,简单来说就是只顾消息发送出去不管死活,不关心是否被成功接收,但是低延迟,高吞吐,适配于对可靠性基本没有要求的情景。

 

一般配置:

对于sync的发送方式:
producer.type=sync
request.required.acks=1

对于async的发送方式:
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200

对于oneway的发送发送:
producer.type=async
request.required.acks=0

Producer相关:

         Kafka有四个核心的API

1.ProducerAPI:允许一个应用向一个或多个topic里发布记录流;

2.ConsumerAPI:允许一个应用订阅一个或多个topics,处理topic里的数据流,就相当于消费;

3.StreamAPI:允许应用扮演流处理的作用,从一个或多个topic里消费数据流,然后产生输出流数据到其他一个或多个topic里,对输入流数据有效传输到输出口;

4.ConnectorAPI:允许运行和构建一个可重复利用的生产者和消费者,能将kafka的topic与其他存在的应用和数据库设备相连接,比如链接一个实时数据库,可以捕捉到每张表的变化。

 

这四个API,主要应用在IDEA上对应用程序的开发中,通过代码的形式管理Kafka

此处提到producer和消息发送,顺便记录一下produceconsumerrIDEjava代码层面的内容和相关配置。

 

代码:

         首先可以写一个充当配置文件作用的接口,配置了Kafka的各种连接参数

   也可以不写,在producer和consumer类里面手动配置

  1. public interface KafkaProperties

  2. {

  3.     final static String zkConnect = "11.22.33.444:5555";

  4.     final static String groupId = "group1";

  5.     final static String topic = "topic1";

  6.     final static String kafkaServerURL = "11.22.33.444";

  7.     final static int kafkaServerPort = 5555;

  8.     final static int kafkaProducerBufferSize = 64 * 1024;

  9.     final static int connectionTimeOut = 20000;//等待时间ms

  10.     final static int reconnectInterval = 10000;//重连间隔ms

  11.     final static String topic2 = "topic2";

  12.     final static String topic3 = "topic3";

  13.     final static String clientId = "SimpleConsumerDemoClient";

  14. }

 

然后是producer类的代码:

public class KafkaProducerSimple {


 


 

    public static void main(String[] args) {

        /**

         * 1、指定当前kafka producer生产的数据的目的地

         *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。

         *  bin/kafka-topics.sh --create --zookeeper master:2181

         *  --replication-factor 1 --partitions 1 --topic orderMq

         */

        String TOPIC = "orderMq8";

        /**

         * 2、读取配置文件

         */

        Properties props = new Properties();

        /*

         * key.serializer.class默认为serializer.class

         */

        props.put("serializer.class", "kafka.serializer.StringEncoder");

        /*

         * kafka broker对应的主机,格式为host1:port1,host2:port2

         */

        props.put("metadata.broker.list", "master:9092,slaver1:9092,slaver2:9092");

        /*

         * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1

         * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。

         * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。

         * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。

         * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。

         * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。

         * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。

         * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失

         */

        props.put("request.required.acks", "1");

        /*

         * 可选配置,如果不配置,则使用默认的partitioner partitioner.class

         * 默认值:kafka.producer.DefaultPartitioner

         * 用来把消息分到各个partition中,默认行为是对key进行hash。

         */

        props.put("partitioner.class", "com.bie.kafka.MyLogPartitioner");

        //props.put("partitioner.class", "kafka.producer.DefaultPartitioner");

        /**

         * 3、通过配置文件,创建生产者

         */

        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));

        /**

         * 4、通过for循环生产数据

注意,Message_ 是partitionKey用来配合partitioner分区规则来对消息进行分区

         */

        for (int messageNo = 1; messageNo < 100000; messageNo++) {

            String messageStr = new String("Messgae_"+ messageNo);


 

            /**

             * 5、调用producer的send方法发送数据

             * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发

             */

            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + messageStr));


 

            //producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "biexiansheng"));

        }

    }

}

 

 

下面是consumer类的代码:

public class KafkaConsumerSimple implements Runnable {

    public String title;

    public KafkaStream<byte[], byte[]> stream;

    public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {

        this.title = title;

        this.stream = stream;

    }

    @Override

    public void run() {

        System.out.println("开始运行 " + title);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        /**

         * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞

         * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false

         * */

        while (it.hasNext()) {

            MessageAndMetadata<byte[], byte[]> data = it.next();

            Object topic = data.topic();

            int partition = data.partition();

            long offset = data.offset();

            String msg = new String(data.message());

            System.out.println(String.format(

                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",

                    title, topic, partition, offset, msg));

        }

        System.out.println(String.format("Consumer: [%s] exiting ...", title));

    }


 

    public static void main(String[] args) throws Exception{

        Properties props = new Properties();

        props.put("group.id", "biexiansheng");

        props.put("zookeeper.connect", "master:2181,slaver1:2181,slaver2:2181");

        props.put("auto.offset.reset", "largest");

        props.put("auto.commit.interval.ms", "1000");

        props.put("partition.assignment.strategy", "roundrobin");

        ConsumerConfig config = new ConsumerConfig(props);

        String topic1 = "orderMq8";

        //String topic2 = "paymentMq";

        //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出

        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);

        //定义一个map

        Map<String, Integer> topicCountMap = new HashMap<>();

        topicCountMap.put(topic1, 3);

        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流

        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);

        //取出 `kafkaTest` 对应的 streams

        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);

        //创建一个容量为4的线程池

        ExecutorService executor = Executors.newFixedThreadPool(3);

        //创建20个consumer threads

        for (int i = 0; i < streams.size(); i++) {

            executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));

        }

    }

}

       


 

Kafka在SpringBoot框架中整合了之后开发流程如下:


 


 

配置统一写在application.yml文件中。

# kafka

spring:

  kafka:

   

 

 

# kafka服务器地址(可以多个)

bootstrap-servers:
consumer:

     

 

 

# 指定一个默认的组名

      group-id: kafka2

     

 

 

# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

     

 

 

# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

     

 

 

# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

      auto-offset-reset: earliest

      

 

 

# key/value的反序列化

      key-deserializer: org

 

 

.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org

 

 

.apache.kafka.common.serialization.StringDeserializer

    producer:

     

 

 

# key/value的序列化

      key-serializer: org

 

 

.apache.kafka.common.serialization.StringSerializer

      value-serializer: org

 

 

.apache.kafka.common.serialization.StringSerializer

     

 

 

# 批量抓取

      batch-size: 

      

 

 

# 缓存容量

      buffer-memory: 

      

 

 

# 服务器地址

      bootstrap-servers:

       #配置项还有很多可以按需配置


 

       或者将配置文件写在application.properties里(springboot1.5)

#============== kafka ===================

# 指定kafka 代理地址,可以多个

spring.kafka.bootstrap-servers=

 

#=============== producer  =======================

 

spring.kafka.producer.retries=0

# 每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

 

# 指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

 

#=============== consumer  =======================

# 指定默认消费者group id

spring.kafka.consumer.group-id=test-consumer-group

 

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

 

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


 

    然后在 Spring Boot 中就可以使用 KafkaTemplate 发送消息,使用 @KafkaListener 监听消费指定主题的消息,在监听中设置监听的 topic ,topics 是一个数组所以是可以绑定多个主题的,可以同时监听多个 topic 的消息。。简单演示代码如下


 

例一:

@Controller

@EnableAutoConfiguration

public class SampleController {

 

    public static Logger logger = LoggerFactory.getLogger(SampleController.class);

 

    @Autowired

    private KafkaTemplate<String, String> template;

 

    @RequestMapping("/send")

    @ResponseBody

    String send(String topic, String key, String data) {

        template.send(topic, key, data);

        return "success";

    }

 

    public static void main(String[] args) throws Exception {

        SpringApplication.run(SampleController.class, args);

    }

 

    @KafkaListener(id = "t1", topics = "t1")

    public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {

        logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());

    }

 

    @KafkaListener(id = "t2", topics = "t2")

    public void listenT2(ConsumerRecord<?, ?> cr) throws Exception {

        logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());

    }

 

}


 

例二:

       Producer:

         @Autowired

    private  KafkaTemplate<String,String> kafkaTemplate;//kafkaTemplate相当于生产者

 

    @RequestMapping(value = "/{topic}/send",method = RequestMethod.GET)

    public void sendMeessage(

            @RequestParam(value = "message",defaultValue = "hello world") String message,

            @PathVariable final String topic) {

        logger.info("start sned message to {}",topic);

        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic,message);//发送消息,topic不存在将自动创建新的topic

        listenableFuture.addCallback(//添加成功发送消息的回调和失败的回调

                result -> logger.info("send message to {} success",topic),

                ex -> logger.info("send message to {} failure,error message:{}",topic,ex.getMessage()));

    }

 

    @RequestMapping(value = "/default/send",method = RequestMethod.GET)

    public void sendMeessagedefault() {//发送消息到默认的topic

        logger.info("start send message to default topic");

        kafkaTemplate.sendDefault("你好,世界");

    }

       Consumer:

         //监听器必须实现MessageListener这个接口中onMessage方法

public class MyMessageListener implements MessageListener<String, String> {

    public final static Logger logger = LoggerFactory.getLogger(MyMessageListener.class);

 

    @Override//此方法处理消息

    public void onMessage(ConsumerRecord<String, String> data) {

        String topic = data.topic();//消费的topic

        logger.info("-------------recieve message from {} topic-------------", topic);

        logger.info("partition:{}", String.valueOf(data.partition()));//消费的topic的分区

        logger.info("offset:{}", String.valueOf(data.offset()));//消费者的位置

        logger.info("get message from {} topic : {}", topic, data.value());//接收到的消息

    }

}


 

               KafkaTemplate.send():

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

topic:主题名称;partition:要发送消息到哪个分区;timestamp:创建消息的时间;key:消息的键;value:消息的值。


 

Kafka的数据复制与Failover

       Replica(副本):

       当某个topic的replication-factor值为N且N>1时,该topic中的每个partition都会有N个副本(replica)。

       副本数应小于broker数量,即对每个partition而言每个broker上最多有一个replica,(partition的所有副本会均匀分布到所有broker上)----为了保证可用性

 


 

       Data replication要解决的问题:

      

  1. 如何propagate(消息传递):

Producer发送消息和consumer接收消息都是通过leader partition(主分区)来进行的,而follower partition(从分区,是leader的一个replica)会周期性的拉取主区中存储的消息数据,从而达到备份的作用,当leader宕机,follower会自动选出新的leader继续提供服务。


 

  1. 何时commit:

Commit是指leader告诉producer,你所发送的消息数据被我成功接收了。Kafka希望保证数据能够及时被follower拉取,从而保持数据不丢失和强可用性,因此何时commit很关键。Commit的方式有同步和异步两种,同步是指producer发出消息被leader接收后,待所有follower都拉取到数据后才向producer返回commit确认。异步是指只要leader拿到数据了,就返回commit。

Kafka在默认情况下用的是既不完全同步也不完全异步的方式,称为ISR方式(in-sync replica):

Leader会负责动态维护一个与其基本保持一致的replica列表,该列表称为ISR,如果列表中有一个follower比leader落后太多,或者超过一定时间没发起拉取数据请求,则leader会把它从ISR移除。当ISR中所有follower(replica)都向leader发送ack时,leader返回commit。

当leader检测到某个follower与自身基本一致且不在ISR中,会将其加入到ISR。

Consumer只能读取到被commit的数据

                Commit策略:

               Server配置

              replica.lag.time.max.ms=10000//最大不拉取时间

              replica.lag.max.messages=4000//最大一致性差距

              Topic配置

              min.insync.replicas=1//ISR中最少的副本数

              Producer配置

request.required.acks=0

//默认值0,异步的,不需要leader给producer返回ACK.

//为了ISR的发挥一般设置为-1. 若设置为

-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

        3.如何处理replica恢复

Leader只会commit ISR列表中所有partition都获取到了的消息,leader宕机后,从ISR列表中选举出新的leader,新leader继续按照与之前一样的原则维护ISR,当宕机的partition重启后,他会将之前commit的数据恢复并通过拉取数据来追赶leader,直到重新加入ISR。

 

在上图的情况中,leaderA在commit M3之前就宕机了,M3没有被ISR列表中的replica 从机拉取到,于是M3就丢失了,这是kafka可能会造成数据丢失的情形。但是M3从来没有被leader commit过,因此producer并不会认为自己发送的M3被成功接收了,它会retry重新发送M3,直到达到retry次数上限。若达到上限M3还没有被commit,那M3可能就真的丢失了。


 

4.如何处理replica全部宕机:

Kafka提供用户两种选择,可以通过配置来改变,默认第二种

    等待ISR中任一Replica恢复,并选它为Leader

   等待时间较长,降低可用性

           或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用

       选择第一个恢复的Replica为新的Leader,无论它是否在ISR中(默认配置)

  并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失

    可用性较高

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐