文章目录

1. kafka概念

在这里插入图片描述

kafka的优点

kafka被称为无限堆积,具有超高的吞吐量和数据量扩展性。因此可以用作流处理平台,用作消息中间件、日志收集等等。由于无限堆积,不需要像其他MQ消费完还得删掉,它可以保存一段时间。

《kafka权威指南》
kafka名称的来历:作者也是学生时代喜欢kafka的文学作品,而且这个名字挺酷的

如何实现高并发的:通过分区,因为分区是kafka的基本存储单元。主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余

一个topic作为一个业务场景
为啥要有分区?为了高并发
为啥要有副本?为了分区的高可用,副本有leader、replica,ISR(in-sync replica)同步中的副本,可参与选主,OSR(Out-sync replica)需要努力追赶leader,追上后进入ISR
topic、partition、replica的关系如下
broker是怎么知道topic、分区、副本的?通过zk

生产一般使用消费者组,但是为了保证互不影响,采用group前缀+随机数的方式生成groupId,各自独立消费,比如业务隔离或者环境隔离;如果需要协作消费,可以搞一个group

如何保证消息的顺序性?分区消费,分区是消息的物理存储单位,同一个分区内,消息是有顺序的

和rocketmq类似,都有broker,不同的是,rocketmq通过nameserver进行调度,而kafka依赖zookeeper进行调度。

producer

生产者,将消息发布到kafka的topic,broker接收到生产者发送的消息,将消息追加到用于追加数据的segment文件中,生产者发送的消息,存储到一个partition中,生产者可指定partition

consumer

从broker中读取数据,可消费多个topic中的数据

topic

使用一个类别属性来划分数据的所属类,划分数据的类称为topic。kafka看做数据库的话,topic可理解为一张表,名字就是表名。

partition

topic中的数据分割为多个partition,每个topic至少一个partition,每个partition中的数据使用多个segment文件存储。partition中的数据是有序的。partition之间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时不能保证数据的顺序,在严格保证消费顺序的场景下,需要将partition设置为1个

partition offset

每条消息都有一个当前partition下唯一的64字节的offset,指明这条消息的起始位置

replicas of partition

副本是一个分区的副本,只用于防止数据丢失。

broker

kafka集群有多个服务器,服务器节点称为broker,broker存储topic的数据。如果某个topic有N个partition,集群有N个broker,则每个broker出处该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,则有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数量少于N个,则一个broker存储该topic的一个或多个partition。实际生产环境需避免该情况发生,容易导致kafka集群数据不均衡

leader

每个partition有多个副本,其中只有一个座位leader,负责数据读写

follower

跟随leader,所有写请求通过leader路由,数据变更会广播给所有follower,follower和leader保持数据同步。如果leader失效,从follower中选举新的leader、

当follower与leader挂掉、卡住或同步太慢,leader会把该follower从"in sync replicas"(ISR)列表中删除,重新创建一个follower

zookeeper

负责维护和协调broker。broker新增或故障失效,由zk通知生产者和消费者。生产者和消费者根据zk的broker状态和broker协调数据的发布和订阅

AR(Assigned Replicas)

分区中所有的副本统称为AR

ISR(In-Sync-Replicas)

所有与leader部分保持一致的副本(包含leader副本)组成ISR

OSR(Out-of-Sync-Replicas)

与leader副本同步滞后太多的副本

HW(High Watermark)

高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息

LEO(Log End Offset)

日志末端位移,记录了该副本底层日志log中下一条消息的位移值,也就是说,如果LEO=10,说明该副本存了10条消息,位移值范围是[0,9]

HW LEO offset的关系

如在副本数为3,A为leader B、C为follower,BC会实时拉取A的消息,HW代表A、B、C同时达到的日志位移,也就是ABC的LEO的最小值,由于延时的问题,HW<=LEO

2. 安装部署

单节点安装

java环境

采用jdk1.8,已安装

zookeeper安装

Kafka通过Zookeeper来实施对元数据信息的管理,包括集群、主题、分区等内容

未找到版本依赖对应关系,暂时用最新版的zookeeper,下载页3.6.3 latest stable release

http://zookeeper.apache.org/releases.html

下载后上传到204虚机,然后在/opt目录下

tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz;
mv apache-zookeeper-3.6.3-bin zookeeper;

配置文件的修改

cp zoo_sample.cfg zoo.cfg
vi zoo.cfg

其他不用变,新增两个目录:
# 数据目录
dataDir=/opt/zookeeper/data
# 日志目录
dataLogDir=/opt/zookeeper/log

启动zk

cd ../bin;
sh zkServer.sh start
kafka的安装

官网下载安装解压缩:http://kafka.apache.org/downloads

最新2.8,推荐2.13版本的Scala对应的版本

解压启动

sh kafka-server-start.sh ../config/server.properties

配置中关键参数

  • broker.id=0 表示broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
  • listeners=PLAINTEXT://:9092 brokder对外提供的服务入口地址
  • log.dirs=/tmp/kafka/log 设置存放消息日志文件的地址
  • zookeeper.connect=localhost:2181 Kafka所需Zookeeper集群地址

验证:

[root@rocketmq-nameserver1 ~]# jps
1794 Kafka
1556 QuorumPeerMain   //为zookeeper的
2218 Jps
kafka测试消费和生产
创建主题
sh kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1

--zookeeper:指定了Kafka所连接的Zookeeper服务地址
--topic:指定了所要创建主题的名称
--partitions:指定了分区个数
--replication-factor:指定了副本因子
--create:创建主题的动作指令

展示所有主题
sh kafka-topics.sh --zookeeper localhost:2181 --list
查看主题详情
sh kafka-topics.sh --zookeeper 192.168.200.204:2181 --describe --topic heima
消费端订阅消息
sh kafka-console-consumer.sh --bootstrap-server 192.168.200.204:9092 --topic heima

--bootstrap-server 指定了连接Kafka集群的地址
--topic 指定了消费端订阅的主题

生产端发送消息
sh kafka-console-producer.sh --broker-list 192.168.200.204:9092 --topic heima

--broker-list 指定了连接的Kafka集群的地址
--topic 指定了发送消息时的主题

集群搭建

还是在203、204、205三台虚拟机中搭建

zk安装
vim zoo.cfg

dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/log
#server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
server.1=192.168.200.203:2888:3888
server.2=192.168.200.204:2888:3888
server.3=192.168.200.205:2888:3888

将zk加入环境变量

vim /etc/profile
export PATH=$PATH:/opt/zookeeper/bin

source /etc/profile

创建serverId

在指定的dataDir也就是data目录下,创建myid文件,对应的数字

问题:搭建集群时,204总是有问题,sh zkServer.sh status发现是standalone

解决:之前单节点安装的时候用过,client port 2181可能被占用,需要把204的改为2182

kafka集群搭建

server.properties

203:

# broker 编号,集群内必须唯一
broker.id=1
# host 地址
host.name=192.168.200.203
# 端口
port=9092
# 消息日志存放地址
log.dirs=/root/kafka/log/cluster/log1
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=192.168.200.203:2181,192.168.200.204:2182,192.168.200.205:2181

204:

# broker 编号,集群内必须唯一
broker.id=2
# host 地址
host.name=192.168.200.204
# 端口
port=9092
# 消息日志存放地址
log.dirs=/root/kafka/log/cluster/log2
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=192.168.200.203:2181,192.168.200.204:2182,192.168.200.203:2181

205:

# broker 编号,集群内必须唯一
broker.id=3
# host 地址
host.name=192.168.200.205
# 端口
port=9092
# 消息日志存放地址
log.dirs=/root/kafka/log/cluster/log3
# ZooKeeper 地址,多个用,分隔
zookeeper.connect=192.168.200.203:2181,192.168.200.204:2182,192.168.200.205:2181

启动

sh kafka-server-start.sh -daemon ../config/server.properties &
验证:
# 在203上执行
sh kafka-topics.sh --create --zookeeper 192.168.200.203:2181,192.168.200.204:2182,192.168.200.205:2181 --replication-factor 3 --partitions 3 --topic test

# 在204上执行
sh kafka-topics.sh --zookeeper 192.168.200.203:2181,192.168.200.204:2182,192.168.200.205:2181 --describe --topic test

docker-compose安装部署

1. docker-compose的安装

定义和运行多个docker应用的工具,yaml文件配置好容器参数之后,一条命令管控所有docker容器的启停。

cd /usr/local/bin
wget https://github.com/docker/compose/releases/download/v2.3.0/docker-compose-linux-x86_64
mv docker-compose-linux-x86_64  docker-compose
chmod +x docker-compose
docker-compose --version
2. yaml文件定义kafka、zk、kafka-eagle

ke为kafka-eagle的缩写,阿里开源的kafka UI工具

#docker-compose up -d 启动
version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13
    eagle:
        image: gui66497/kafka_eagle
        container_name: ke
        restart: always
        depends_on:
            - kafka-1
            - kafka-2
        ports:
            - "10907:8048"
        environment:
            ZKSERVER: "zookeeper:2181"
    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 192.168.200.129
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
            KAFKA_ADVERTISED_HOST_NAME: 192.168.200.129
            KAFKA_ADVERTISED_PORT: 10903 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 192.168.200.129
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 192.168.200.129
            KAFKA_ADVERTISED_PORT: 10904 
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper
3. docker-compose对kafka容器的启停
# 部署并后台启动
docker-compose -f km.yml up -d

# 列出 Compose 应用中的各个容器 输出内容包括当前状态、容器运行的命令以及网络端口
docker-compose ps
# 停止并删除运行中的 Compose 应用 会删除容器和网络,但是不会删除卷和镜像
docker-compose -f km.yml down
4. eagle的使用
 http://192.168.200.129:10907/ke/
 用户名/密码 admin/123456
 如果要删除topic等操作,需要管理token: keadmin

可登录控制台上面的zkCli
在这里插入图片描述

3. 生产者

1. 流程

消息发送流程
  1. 构造一个ProducerRecord对象,该对象可声明topic、partition、key和value,topic和value必须声明,partition和key可不指定
  2. 调用send()方法发消息
  3. 消息要到网络上传输,必须序列化,序列化器将msg的key和value序列化为byte
  4. 分区器进行分区,生产者知道往哪个主题和分区发送记录
  5. 该记录会被添加到一个记录批次,该批次所有消息会被发送到相同的topic和partition,有个独立的线程将这些 记录批次发送到相应的broker中
  6. broker接收到消息,表示发送成功,返回消息的元数据(包含topic、partition和offset),发送失败选择重试或抛异常
发送类型

发送即忘记

producer.send(record)

同步发送

通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应,如果响应正常,范湖一个RecordMetadata对象,该对象存储消息的偏移量;
如果kafka发生错误,无法正常响应,会抛异常,可进行异常处理

producer.send(record).get();

异步发送

producer.send(record,new Callback()){
  public void onCompletion(RecordMetadata metadata,Exception exception){
    if(exception == null){
      System.out.println(metadata.partition()+":"+metadata.offset());
    }
  }
};
序列化器

默认的:

  • 字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)
  • 整型(IntegerSerializer)
  • 字节数组(BytesSerializer)序列化器

都实现了接口(org.apache.kafka.common.serialization.Serializer)

自定义序列化器:

@Override
public void configure(Map configs, boolean isKey){

}

@Override
public byte[] serialize(String topic, Company data){
  if(data == null){
    return null;
  }
  byte[] name,address;
  try{
    if(data.getName() != null){
      name = data.getName().getBytes("UTF-8");
    }else{
      name = new byte[0];
    }

    if(data.getAddress() != null){
      address = data.getAddress().getBytes("UTF-8");
    }else{
      address = new byte[0];
    }

    ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length+address.length);
    buffer.putInt(name.length);
    buffer.put(address);
    return buffer.array();
  }catch (UnsupportedEncodingException e){
    e.printStackTrace();
  }
  return new byte[0];
}

@Override
public void close(){
  
}

ProducerDefineSerializer使用自定义的序列化器:

 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,CompanySerializer.class.getName());
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
分区器

多种策略

DefaultPartitioner,kafka根据msg的key进行分区的分配,即hash(key)%numPartitions key相同被分到同一分区

源码:

int part = Utils.toPositive(nextValue) % availablePartitions.size();

RoundRobinPartitioner

实现自定义分区器,通过配置参数ProducerConfig.PARTITIONER_CLASS_CONFIG 实现

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getNam e());

拦截器interceptor

ProducerInterceptor,kafka0.10版本引入,实现client端的定制化逻辑

发送原理

发送过程,涉及到两个线程的协作。main线程将业务数据封装为ProducerRecord对象,之后调用send()方法将消息放到RecordAccumulator(消息收集器,可认为是main线程和sender线程的缓冲区)暂存。Sender线程负责将消息构成请求,并最终执行网络IO

RecordAppendResult result = this.accumulator.append(xxx);
return result.future;

RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,由batch.size参数来指定即16KB,我们可以适当地调大batch.size参数以便多缓存一些消息

Sender 最后封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,类似dubbo底层也是Request、Response封装的请求响应模型
在这里插入图片描述

生产者参数

acks

指定分区中必须有多少个副本收到这条msg,producer才会认为写入成功,该参数涉及到消息的可靠性和吞吐量的平衡

  • ack=0 无需等待服务器响应,吞吐量最高
  • ack=1 默认值,只要集群的leader收到即可响应成功
  • ack=-1或者all 所有ISR节点都收到才会响应成功

注意:为string类型,而非int

retries

prod 从server中收到的错误可能是临时性的错误,如果达到retries设置的次数,prod会放弃重试返回错误。默认重试之间的wait time为100ms,通过retry.backoff.ms参数修改该时间间隔

batch.size

多个消息发送到同一partition,prod将其放入同一batch,该参数指定一个batch可使用的内存大小,按字节计算。不过,并非一定要等batch填满才发送,可能一个msg就发送了。如果设置太小,可能会频繁发送msg增加开销

max.request.size

用于控制prod发送的request的大小,指定能发送的单个msg的max,也可指单个req的所有msg的总大小。broker也有对应的可接收msg的max限制message.max.size,最好匹配,避免msg被broker拒绝

4. 消费者

消费者是消费组的一部分,多个消费者形成一个消费组来消费topic,每个consumer收到不同的分区的msg。假设topic为T1,有4个partition,有个消费组G1,只有1个消费者C1,则C1会收到4个partition的msg。

每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

两种消息投递方式

  • 点对点 p2p 所有消费者属于同一个group
  • 发布订阅 pub/sub 每个消费者都有自己的group

Kafka中的消费是基于拉模式的。poll()方法里还有一个超时时间参数timeout

poll()涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容

4.1 消息接收

4.1 参数配置
// 消费者隶属的消费组,具体业务含义
props.put("group.id", groupId);
// 对应的客户端id,默认为空,不设置会自动生成一个非空字符串
props.put("client.id", "consumer.client.id.demo");
4.1.2 订阅topic和分区
consumer.subscribe(Arrays.asList(topic));

可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费

consumer.subscribe(Pattern.compile("heima*"));

指定订阅的分区

consumer.assign(Arrays.asList(new TopicPartition("topic0701", 0)));

4.2 位移提交

kafka的两种offset:

  • 分区中,每条消息都有唯一的offset,表示消息在分区中的对应位置,称为偏移量
  • 消费者使用offset表示消费到分区中某个消息所在的位置,称为位移,更明确的是“消费位移”,消费位移存在kafka内部的主题_consumer_offsets

kafka中,消费者自己管理消费的位移,把消费位移存储起来(持久化)的动作,称为“提交”,消费者消费完消息后,需要执行消费位移的提交。

当前消费者需要提交的位移并不是x,而是x+1,表示下一条需要拉取的消息的位置。

消息丢失

比如poll拉取到的消息集是[x+2,x+7],然后x+5是正在处理的位置,如果拉取到消息之后就进行位移提交,也就是提交了x+8,那如果x+5遇到异常,故障恢复后,重新拉取的消息就是从x+8开始,x+5到x+7就丢失了。

重复消费

如果在消费完所有消息后再做位移提交,如果x+5正在处理的位置,故障恢复后,重新拉取是从x+2开始的。也就是说x+2到x+4又重新消费了一遍。

当然,实际情况有可能更加复杂。

自动提交

enable.auto.commit设置为true,消费者会在poll()方法调用后定时(auto.commit.interval.ms)提交一次位移。

同样有重复消费的问题

同步提交

可以优化为批量处理+批量提交,出现异常进行捕获后记录日志补偿

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    if (records.isEmpty()) {
    	break;
    }
    List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
    lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    consumer.commitSync();//同步提交消费位移
 }
异步提交

手动提交的缺点:发起提交调用时,应用会阻塞。可使用异步提交的api

异步提交的缺点:提交后,如果server返回提交失败,异步提交不会重试。

consumer.commitAsync(new OffsetCommitCallback(){
	public void onComplete(){
		...
	}
})

4.3 指定位移消费

消息拉取根据poll()方法的逻辑处理,无法掌握其消费的起始位置。

seek()方法提供该功能,追踪以前的消费或回溯消费

// timeout参数
consumer.poll(Duration.ofMillis(2000));
// 获取消费者所分配到的分区
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition tp : assignment) {
// 参数partition表示分区,offset表示指定从分区的哪个位置开始消费
consumer.seek(tp, 10);
}

//consumer.seek(new TopicPartition(topic,0),10);

4.4 再均衡监听器

再均衡是指分区的所属从一个consumer转移到另一个consumer,它为consumer group具备高可用和伸缩性提供保障,使得可方便和安全的删除消费组内的consumer或往group添加consumer。不过rebalance期间无法拉取消息

有个ConsumerRebalanceListener 在再均衡发生期间,消费组内的消费者是无法读取消息的

consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 劲量避免重复消费
    consumer.commitSync(currentOffsets);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    //do nothing.
    }
});

4.4 消费者拦截器

消费到消息或提交位移时的个性化操作

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName());

4.5 消费者参数补充

fetch.min.bytes

消费者指定从broker中读取到消息的最小的数据量。如果数据量小于这个阈值,broker会等待足够的数据,再返回给consumer,减轻broker压力

fetch.max.wait.ms

指定consumer读取时的最长等待时间,避免长时间阻塞,默认500ms

max.partition.fetch.bytes

指定每个分区返回的最多字节数,默认1M。实际可能需要更多空间

max.poll.records

控制一个poll()调用返回的记录数,控制在拉取循环中的处理数据量

5. topic

5.1 管理

5.1.1 创建主题

./kafka-topics.sh --zookeeper 192.168.200.204:2181 --create --topic heima2 --partitions 2 --replication-factor 1

  • zookeeper为必传参数,多个zk用","分开
  • partition设置主题分区数,每个线程处理一个分区数据
  • replication-factor 设置主题副本数,每个副本分布在不同节点,不能>总的节点数

查看topic元数据信息的方法

在zk节点中,./zkCli.sh -server 192.168.200.204:2181,进入后,get /broker/topics/heima2

返回{"partitions":{"0":[1],"1":[1]},"topic_id":"zyWSTxTiR-eMffVtx4JLYQ","adding_replicas":{},"removing_replicas":{},"version":3}

5.1.2 查看主题

./kafka-topics.sh --list --zookeeper 192.168.200.204:2181

// 查看特定主题:
./kafka-topics.sh --describe --zookeeper 192.168.200.204:2181 --topic heima2

Topic: heima2	TopicId: zyWSTxTiR-eMffVtx4JLYQ	PartitionCount: 2	ReplicationFactor: 1	Configs: 
	Topic: heima2	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: heima2	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
//查看正在同步的主题
./kafka-topics.sh --describe --under-replicated-partitions
5.1.3 修改主题
//增加配置
./kafka-topics.sh --alter --zookeeper 192.168.200.204:2181 --topic heima2 --config flush.messages=1

//删除配置
./kafka-topics.sh --alter --zookeeper 192.168.200.204:2181 --topic heima2 --delete-config flush.messages
5.1.4 删除topic

如果delete.topic.enable=true,直接彻底删除该topic。

./kafka-topics.sh --delete  --zookeeper 192.168.200.204:2181 --topic heima2

Topic heima2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

//标记为marked for del
./kafka-topics.sh --list --zookeeper 192.168.200.204:2181 

__consumer_offsets
heima
heima-par
myTopic

5.2 增加分区

./kafka-topics.sh --alter --zookeeper 192.168.200.204:2181 --topic heima --partitions 3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

//减少不行
./kafka-topics.sh --alter --zookeeper 192.168.200.204:2181 --topic heima --partitions 2

Error while executing topic command : The number of partitions for a topic can only be increased. Topic heima currently has 3 partitions, 2 would not be an increase.
[2021-11-11 00:17:46,400] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic heima currently has 3 partitions, 2 would not be an increase.

5.4 kafkaAdminClient应用

将某些管理查看的功能集成到第三方(如kafka Manager)中,需要调用API操作kafka
以addPartition为例

public static void addTopicPartitions() throws ExecutionException, InterruptedException {
    String brokerList =  "192.168.200.204:9092";
    String topic = "heima";

    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    AdminClient client = AdminClient.create(props);

    NewPartitions newPartitions = NewPartitions.increaseTo(5);
    Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
    newPartitionsMap.put(topic, newPartitions);
    CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
    result.all().get();

    client.close();
}

6. 分区

1. 副本机制

由于prod和consumer都只和leader角色的分区副本相连,其余副本只为保证高可用。一个分区可有多个副本,保存在不同的broker中,当broker宕机,leader在该broker中的分区不可用,kafka自动移除leader,选其他副本的一个作为新的leader。

通常增加分区可提高集群的吞吐量。但总分区数或单台server的分区数过多,增加不可用和延迟的风险。

2. 分区leader选举

不是采用多数选举的方式进行leader的选举,而是在zk上维护每个topic的ISR(in-sync replica,已同步的副本)的集合。只有该列表中的才有资格成为leader。kafka需要的冗余度较低,比如某个topic有x+1个副本,可容忍x个不可用。如果ISR的都不可用,也可选择其他可用的,只是数据不一致。

3. 分区重新分配

往kafka集群添加机器后,新添加的节点并不会自动分配数据,需要重新分配。

./kafka-topics.sh --create --zookeeper localhost:2181 --topic heima-par1 --partitions 3 --replication-factor 3 

# 先扩充分区为4个
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic heima-par1 --partitions 4

vi reassign.json

{"topics":[{"topic":"heima-par1"}],"version":1}

kafka-reassgin-partitions.sh生成reassgin plan

./kafka-reassign-partitions.sh --zookeeper 192.168.200.204:2181 --topics-to-move-json-file reassign.json --broker-list "1,2,3,4" --generate
# --generate指定类型参数
# --topics-to-move-json-file 指定分区重分配对应的topic清单

注意:
命令输入两个json字符串,第一个json为当前的分区副本分配情况,第二个为重分配的候选方案,并未执行操作

将第二个json保存到名为result.json文件中

执行分配策略

./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file result.json --execute 

# 查看分配进行情况
./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file result.json --verify

# 尴尬了,并未生效...

参考源码:org.apache.kafka.clients.consumer.RangeAssignor

int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
for(int n = consumersForTopic.size(); i < n; ++i) {
        int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
        int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
       ((List)assignment.get(consumersForTopic.get(i))).addAll(partitions.subList(start, start + length));
}

原理:按照consumer总数和partition总数进行整除获得一个跨度,再将partition按跨度进行平均分配,以保证尽可能将分区均匀分给consumer。
对于每个topic,RangeAssignor将consumerGroup的所有订阅该topic的consumer按name的字典排序,再为每个consumer划分固定的range,如果不够,字典序列靠前的consumer会多分配一个partition。

假设消费组内有2个消费者c0和c1,都订阅topic t0和t1,且每个topic都有4个partition,则所有分区为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。分配结果为:

  • c0: t0p0、t0p1、t1p0、t1p1
  • c1: t0p2、t0p3、t1p2、t1p3

假设2个topic都有3个partition,所有partition为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2,分配结果为:

  • c0: t0p0、t0p1、t1p0、t1p1
  • c1: t0p2、t1p2

分配并不均匀,如果类似情况扩大,可能部分consumer过载。

RoundRobinAssignor分配

将consumerGroup内所有consumer和consumer订阅的topic的partition按照字典排序,然后轮询给每个consumer。

stickyAssignor分配策略

0.11.x版本引入,目的:

  • 分区尽可能分配均匀
  • 分区的分配尽可能与上次分配保持相同
自定义分配策略

实现 PartitionAssignor

7. kafka存储

1. 存储结构

kafka的消息以topic为基本单位进行归类,每个topic又分为一个或多个partition,分区的num可在topic创建时指定,或之后修改。每个msg发送时根据partition规则被追加到指定partition,partition中每个msg被分配一个唯一的序列号,也即是offset,结构如下图
在这里插入图片描述
segment文件的命名规则:partition全局的第一个segment从0开始,之后每个segment文件name为上一个segment文件最后一个msg的offset值。

每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment中第一条消息的offset。日志文件和索引文件都根据backOffset命名,数值最大为64位long大小,固定20位数字,没有数字用0填充。

log对应命名形式为topic-partition的文件夹,如topic为myTopic,有4个分区,对应myTopic-0、myTopic-1、myTopic-2、myTopic-3这4个文件夹。
往log中追加消息是顺序写入的,只有最后一个LogSegment才能执行写入,称为activeSegment。
在这里插入图片描述
kafka文件目录布局如下
在这里插入图片描述

2. 日志索引

数据文件分段

将数据文件分段,如100条msg,offset从0到99,将数据文件分为5段,每段放在单独数据文件中,数据文件以该段最小的offset命名,可二分查找定位该msg在哪个段中。

偏移量索引

分段后,仍需要顺序扫描找到对应offset的msg,为进一步提高效率,搞了个.index文件,索引文件。

因此,msg存储采用分区partition、分段logSegment和稀疏索引提高效率。

3. 日志清理

两种清理策略

  • 日志删除log retention
  • 日志压缩log compaction 针对每个msg的key整合,只保留最后一个value版本

但是好像没看到压缩相关的配置…

4. 磁盘存储

顺序写盘,os对线性读写的优化,如预读read-ahead和后写write-behind技术。而且比随机写内存快,因此性能较高。

页缓存,是os实现的一种主要的磁盘缓存,减少对磁盘io的操作。其实就是将磁盘数据缓存到内存。
进程准备读取磁盘文件时,os会先查看待读取的数据所在的page是否存在pagecache,如果有,直接返回数据,避免物理磁盘的IO。对进程而言,在进程内部缓存处理所需数据,但这些数据可能还缓存在os的页缓存,除非使用direct IO。此外,java进程对象的内存开销很大,且垃圾回收随堆内数据的增多而变得越来越慢。而且,即使kafka服务重启,页缓存也有效,而进程内的缓存需要重建。因此,使用页缓存,是kafka实现高吞吐的重要因素之一。

此外,kafka还使用零拷贝zero_copy技术进一步提升性能。数据直接从磁盘文件复制到网卡设备,不经由应用程序,减少内核和用户模式之间的上下文切换。

3. 控制器

kafka集群有一个或多个broker,其中一个会被选举为controller,负责管理整个集群的所有分区和副本的状态。当某个分区的leader副本出现故障,有controller负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化,也是controller负责通知所有broker更新其元数据信息。当使用kafka-topics.sh为某个topic增加分区num,也是controller负责分区的重新分配。

controller选举的工作依赖于zk,成功竞选的broker会在zk中创建/controller这个临时节点。
采用ZooInspector查看
在这里插入图片描述
如图,brokerid为controller的broker的id编号,timestamp是竞选为controller时的时间戳。
任意时刻,集群中有且只有一个controller,每个broker启动时都去读取/controller的brokerid的值,如果zk不存在该节点,或者数据异常,就尝试创建,只有创建成功的broker会成为controller。
在这里插入图片描述

zk中还有个/controller_epoch节点,是持久节点。用于记录controller变更的次数,称为“控制器的纪元”。初始为1,每次controller变更,选出新的controller该字段+1。
作为controller的broker的职责:

  • 监听partition变化
  • 监听topic变化
  • 监听broker变化

4. 可靠性保证

确保系统在各种不同的环境能发生一致的行为

kafka的保证:

  • 保证分区消息的顺序,通过offset保证消息的顺序
  • 只有消息被写入分区所有同步副本时,才被认为是已提交。生产者可选择接受不同类型的确认,控制参数为acks
  • 只要还有一个副本是active,则已提交的消息不会丢失
  • 消费者只能读取已提交的消息

失效副本

怎么判定一个分区是否有副本是处于同步失效状态?参数replica.log,time.max.ms控制,默认1w,当follower滞后leader的time超过该值,就判定副本失效,剔除ISR。

具体原理:当follower副本将leader副本的LEO(Log End Offset,每个分区最后一个msg的位置)之前的日志都同步后,认为已追上leader副本,更新该副本的lastCaughtUpTimeMs标识。kafka的副本管理器ReplicaManager启动时会启动定时任务进行副本过期检测,看当前time和副本的lastCaughtUpTimeMs差值是否大于该参数。

副本复制

kafka的每个topic的partition都被复制了n次,n为topic的复制因子(replication factor),保证kafka在集群server发生故障时自动切换副本。以分区为粒度,follower只按顺序从leader上复制日志。

5. 一致性保证

leader宕机后,只能从ISR中选取新的leader,都知道HW之前的数据,保证切换leader后,consumer可继续看到HW之前已提交的数据。

HW的截断机制

选举出新的leader后,新的leader不能保证完全同步之前leader的所有数据,只能保证HW之前的数据是同步过的。此时,所有follower都要将数据截断到HW的位置,再跟新leader同步数据,保证数据一致性。当宕机的leader恢复后,也要将自己的数据截断到宕机之前的HW位置,然后同步新的leader的数据。

leader epoch引用

由于HW值异步延迟,需要额外的fetch请求处理才能更新,可能过期,造成数据不一致。比如leader副本发生切换,发生数据丢失或leader副本和follower副本数据不一致的问题。

数据丢失:
在这里插入图片描述
数据不一致的
在这里插入图片描述

因此,引入leader epoch取代HW值。leader端内存保存leader的epoch信息,为一对值(epoch, offset)

  • epoch表示leader的版本号,从0开始,leader变更一次epoch+1
  • offset对应该版本leader写入第一条msg的位移

leader broker保存该缓存,定期写入checkpoint文件。
在这里插入图片描述

6. _consumer_offsets

内部topic,保存kafka的consumer的位移信息,有consumer消费消息时自动创建该topic,分区数可有offsets.topic.num.partitions指定,默认50

8. springboot集成

更加轻便

配置文件

pom.xml

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

application.properties

spring.kafka.producer.bootstrap-servers=192.168.200.204:9092
spring.kafka.consumer.bootstrap-servers=192.168.200.204:9092
spring.kafka.producer.transaction-id-prefix=kafka_tx.

消息发送

@Autowired
private KafkaTemplate template;
private static final String topic = "heima";

@Transactional
@GetMapping("/send/{input}")
 public String sendToKafka( @PathVariable String input){
     template.send(topic,input);
     return "send successfully!"+input;
 }

消息接收

@KafkaListener(id="", topics = topic, groupId = "group.demo")
public void listen(String msg){
    logger.info("input value:{} ",msg);
}

9. 调优

网络和io操作线程配置优化

broker处理消息的最大线程数(默认3):

  • num.network.threads=cpu核数+1

broker处理磁盘io的线程数:

  • num.io.threads=cpu核数*2

log数据文件刷盘策略

每当producer写入1w条消息时,刷数据到磁盘

  • log.flush.interval.messages=10000

每隔1s,刷数据到磁盘

  • log.flush.interval.ms=1000

日志保留策略配置

保留3天,也可更短

  • log.retention.hours=72

段文件配置1GB,利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,文件数目比较多,kafka启动时是单线程扫描目录下所有数据文件)

  • log.segment.bytes=1073741824

调优vm.max_map_count参数

适用于kafka broker上的主题数超多的情况,kafka日志段的索引文件是用映射文件的机制做的,如果有超多日志段,索引文件数太多,容易打爆资源限制,需要适当调大该参数

jvm参数

至少java9,堆大小6-10G足够。

配置参数

message.max.bytes,默认消息大小最大976.6kb,如果更改,也要同步更改max.request.size 客户端的和max.message.bytes topic端的参数

偏移量offset重置策略

  • earliest: 如果消费者组尚未在分区中存储偏移量(即新的消费者组或者消费者组首次消费特定分区),则从该分区的最早偏移量开始消费。这意味着消费者将从分区的起始处开始消费消息,即使之前的消息已经被消费过也会重新消费。

    • 适用场景: 适合需要消费历史所有消息或者确保不错过任何消息的情况,比如消费者应用启动时或者需要重新处理历史数据时。
  • latest: 如果消费者组尚未在分区中存储偏移量,或者指定了重置偏移量为 latest,消费者将从分区的最新偏移量开始消费。这意味着消费者只会消费自它加入消费者组后发送到分区的新消息。

    • 适用场景: 适合处理实时数据,只关心加入消费者组后产生的新消息,不需要处理之前的历史消息。

为了帮助broker更快定位到指定的偏移量,Kafka为每个分区维护了一个索引。该索引将偏移量与片段文件以及偏移量在文件中的位置做了映射
类似地,Kafka还有第二个索引,该索引将时间戳与消息偏移量做了映射。在按时间戳搜索消息时会用到这个索引。这种搜索方式在Kafka Streams中使用广泛

kafka的日志消息格式

大致可以看成header和body,header里面包含offset和size,body里面除了value、value length还有crc32校验、magic版本号、attributes属性记录压缩类型等。

v1在body里面增加了timestamp,可以干啥?对内日志保存、切割,对外消息审计、大数据应用等扩展功能。能否加到value中?不得行,解析变长的消息体会带来额外的性能开销,因此放到body。attributes字段的第四位被用作时间戳类型,0是createTime,1是logAppendTime,默认createTime,也就是生产者创建消息的时间戳。

v2版本多个消息存入RecordBatch,只需对整个RecordBatch计算一次CRC,引入了变长整型varints节省空间。

V0版本

其实也可以看作head 和body
head被称作LOG_OVERHEAD,包含offset和message size,固定8+4=12B
offset是逻辑值,而非实际物理偏移值,message size表示消息的大小,这两者在一起被称为日志头部(LOG_OVERHEAD),固定为12B
body就是record,包含如下,

  • crc32(4B):crc32校验值。校验范围为magic至value之间。
  • magic(1B):消息格式版本号,此版本的magic值为0。
  • attributes(1B):消息的属性。总共占1个字节,低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
  • key length(4B):表示消息的key的长度。如果为-1,则表示没有设置key,即key=null。
  • key:可选,如果没有key则无此字段。
  • value length(4B):实际消息体的长度。如果为-1,则表示消息为空。
  • value:消息体。可以为空,比如墓碑(tombstone)消息。

如果空消息,record至少14B

V1版本

V1版本多了一个timestamp字段,表示消息的时间戳
v1版本的attributes字段中的低3位和v0版本的一样,还是表示压缩类型,而第4个位(bit)也被利用了起来:0表示timestamp类型为CreateTime,而1表示timestamp类型为LogAppendTime
在这里插入图片描述
timestamp类型由broker端参数log.message.timestamp.type来配置

日志压缩:kafka一般采用端到端压缩,生产者发送的压缩数据在broker中也是保持压缩状态进行存储的,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息

通过参数 compression.type 来配置压缩方式,默认producer,跟生产者保持一致

消息压缩时是将整个消息集进行压缩作为内层消息(inner m
essage),内层消息整体作为外层(wrapper message)的 value

V2版本

从0.11.0版本开始,参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码

Varints是使用一个或多个字节来序列化整数的一种方法,位于BytesUtils工具类,推导出来,0~63之间的数字占1个字节,64~8191之间的数字占2个字节,8192~1048575之间的数字占3个字节,broker端配置message.max.bytes的默认大小为1000012 (Varints编码占3个字节),绝大多数情况下都会节省空间

改造:

  • crc 将 crc 的字段从 Record 中转移到了RecordBatch中,只需对整个RecordBatch计算一次CRC,提高写入性能;减少了每条消息的元数据开销,从而节省了存储空间
  • magic V2版本为2
  • attributes:消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考v0和v1;第4位表示时间戳类型;第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。第6位表示是否是控制消息(ControlBatch),0表示非控制消息,而1表示是控制消息,控制消息用来支持事务功能
  • producer id:PID,用来支持幂等和事务
  • producer epoch:和producer id一样,用来支持幂等和事务
  • first sequence:和 producer id、producer epoch 一样,用来支持幂等和事务

看单条消息,空间占用比较大,消息的占用字节数=61B+14B+1B=76B,但是大批量的时候会极大的节省空间,本来应该占用740B大小的空间,实际上只占用了191B,在v0版本中这10条消息需要占用320B的空间大小,而v1版本则需要占用400B的空间大小,这样看来v2版本又节省了很多空间,因为它将多个消息(Record)打包存放到单个RecordBatch中,又通过Varints编码极大地节省了空间

如何查看日志内容:在Kafka 2.0.0之前并没有kafka-dump-log.sh脚本,所以只能使用kafka-run-class.sh kafka.tools.DumpLogSegments的形式

日志索引

采用稀疏索引,偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息
每当写入一定量(由broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项
都是通过二分查找进行定位。稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

日志分段切割的条件(满足一个即可):

  • 日志分段大小大于1GB log.segment.bytes
  • 日志满7天 log.roll.hours
  • 偏移量索引文件大小达到10MB broker配置log.index.size.max.bytes
  • 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE offset-baseOffset>Integer.MAX_VALUE

偏移量索引项的格式

  • 索引文件的文件名即为baseOffset的值
  • relativeOffset:相对偏移量,占用4 个字节,减小索引文件占用的空间
  • position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用4个字节

Kafka 强制要求索引文件大小必须是索引项大小的整数倍,对偏移量索引文件而言,必须为8的整数倍

如果要查找偏移量为268的消息,那么应该怎么办呢?首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset=268-251=17,之后再在对应的索引文件中找到不大于17的索引项,最后根据索引项中的position定位到具体的日志分段文件位置开始查找目标消息。
如何查找baseOffset 为251的日志分段的呢?用了跳跃表的结构。Kafka 的每个日志对象中使用了ConcurrentSkipListMap来保存各个日志分段,每个日志分段的baseOffset作为key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。

时间戳索引的格式

  • timestamp:当前日志分段最大的时间戳 8字节
  • relativeOffset:时间戳所对应的消息的相对偏移量 4字节
    与偏移量索引文件相似,时间戳索引文件大小必须是索引项大小(12B)的整数倍,如果不满足条件也会进行裁剪

如果要查找指定时间戳targetTimeStamp=1526384718288开始的消息,首先是找到不小于指定时间戳的日志分段。这里就无法使用跳跃表来快速定位到相应的日志分段了

日志删除

有删除策略

  • 基于时间
  • 基于日志大小
    除了删除还有compact整理,类似AOF的rewrite,只保留key的最新value,然后合并日志段

基于时间:

日志删除任务先check 是否有超过阈值retentionMs来寻找可删除的日志分段文件集合deletableSegments,可以通过broker端参数log.retention.hours、log.retention.minutes和log.retention.ms来配置,默认情况下只配置了log.retention.hours参数,168,也就是7天

如何删除:首先会从Log对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上“.deleted”的后缀,最后交由一个以“delete-file”命名的延迟任务来删除这些以“.deleted”为后缀的文件,默认1分钟

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)

kafka的存储优化

顺序写

顺序写磁盘的效率远高于随机写磁盘,也高于随机写内存。
采用了文件追加的方式来写入消息,操作系统对顺序写也有优化,预读(read-ahead,提前将一个比较大的磁盘块读入内存)和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。

页缓存

减少对磁盘 I/O 的操作,对磁盘的访问变为对内存的访问。
消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

零拷贝

静态文件展示给用户
传统:
read从磁盘读取到内存buf
write 通过socket将buf传给用户

实际经过四次复制,很耗时

![[Pasted image 20240711064706.png]]

零拷贝的实现

![[Pasted image 20240711065538.png]]

DMA将文件内容复制到内核模式下的Read Buffer 中。不过没有数据被复制到 Socket Buffer,相反只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次
零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。

生产者

producer的参数很多,常见的必填的有

  • bootstrap.servers
  • key.serializer
  • value.serializer
    其他的可参见ProducerConfig

然后就是消息,放在 ProducerRecord中,其中topic属性和value属性是必填项

![[Pasted image 20240718204134.png]]

send方法得到Future<RecordMetadata> 在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。调试的时候可以查看。

序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象

分区器

partitioner接口,对应有多个实现,多种分区策略,方法partition()是计算分区号
![[Pasted image 20240718205928.png]]

DefaultPartitioner 对key做hash计算分区号(murmurHash2算法)

自定义分区器的实现比较简单,读者也可以根据自身业务的需求来灵活实现分配分区的计算方式,比如一般大型电商都有多个仓库,可以将仓库的名称或ID作为key来灵活地记录商品信息

ProducerInterceptor
场景:不同pbx消费不同的消息
比如我们代码不好改造,就加个interceptor,去处理这个逻辑。然后消费者判断前缀是否自己的pbx

![[Pasted image 20240718212035.png]]

整体架构

如下图,主线程和Sender线程
消息在client中,先经过 interceptor、serializer、partitioner然后到消息累加器RecordAccumulator,Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中
RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory 配置,默认32MB

主要用来缓存消息以便 Sender 线程可以批量发送

![[Pasted image 20240718212308.png]]

如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory参数适当调大以增加整体的吞吐量。
RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,由batch.size参数来指定即16KB,我们可以适当地调大batch.size参数以便多缓存一些消息

Sender 最后封装成<Node,Request>的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,类似dubbo底层也是Request、Response封装的请求响应模型

client是怎么和server交互的?

首先是元数据信息的获取。
有哪些元数据信息?
集群有哪些topic,topic有哪些partition?每个partition的leader副本在哪个broker上,follower在那些broker上?那些副本在AR ISR等集合中,集群有那些broker?controller是哪个broker这些

当client没有这些元数据信息,或者超过metadata.max.age.ms时间内没有更新元数据信息,都会触发元数据信息的更新操作,默认5min,更新元数据信息时,先挑选出leastLoadedNode,然后向这个node发送MetadataRequest请求获取具体的元数据信息。

其他常用参数

acks

  • acks=0:不等Leader将数据落到日志,Kafka直接返回完成信号给客户端
  • acks=1:等Leader将数据落到本地日志,但是不等Follower同步数据,Kafka就直接返回完成信号给客户端
  • acks=all:等Leader将数据落到日志,且等min.insync.replicas个Follower都同步数据后,Kafka再返回完成信号给客户端

retries 默认0,可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的
重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。一般而言,在需要保证消息顺序的场合建议把参数max.in.flight.requests.per.connection配置为1,而不是把acks配置为0,不过这样也会影响整体的吞吐。

compression.type
如果对时延有一定的要求,则不推荐对消息进行压缩。

linger.ms参数,指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0
与TCP协议中的Nagle算法有异曲同工之妙

消费者

每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

两种消息投递方式

  • 点对点 p2p 所有消费者属于同一个group
  • 发布订阅 pub/sub 每个消费者都有自己的group

Kafka中的消费是基于拉模式的。poll()方法里还有一个超时时间参数timeout

poll()涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容

位移提交

对于消息在分区中的位置,我们将offset称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。

消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

位移提交的具体时机很有讲究,很可能出现重复消费和消费丢失的情况。

消息丢失
比如poll拉取到的消息集是[x+2,x+7],然后x+5是正在处理的位置,如果拉取到消息之后就进行位移提交,也就是提交了x+8,那如果x+5遇到异常,故障恢复后,重新拉取的消息就是从x+8开始,x+5到x+7就丢失了。
重复消费
如果在消费完所有消息后再做位移提交,如果x+5正在处理的位置,故障恢复后,重新拉取是从x+2开始的。也就是说x+2到x+4又重新消费了一遍。

当然,实际情况有可能更加复杂。
kafka默认自动提交,enable.auto.commit true; auto.commit.interval.ms默认5s

为解决这些问题,建议手动提交。精确管理位移操作,消息做好业务处理后再进行位移提交

  • 同步提交 commitSync() 还可以优化为批量处理+批量提交,出现异常进行捕获后记录日志补偿
  • 异步提交 commitAsync() 消费者线程不会阻塞,提高性能,异步+同步保证最后的offset正确

![[Pasted image 20240720114943.png]]

指定位移消费

比如消费者组刚创建,或者订阅一个新的新主题,没有可以查找的消费位移。根据auto.offset.reset的配置决定从哪里开始消费,默认latest

再均衡

有个ConsumerRebalanceListener,指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障。缺点:在再均衡发生期间,消费组内的消费者是无法读取消息的;如果消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费

如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器(ConsumerCoordinator)和组协调器(GroupCoordinator)来完成的,它们之间使用一套组协调协议进行交互。
GroupCoordinator是Kafka服务端中用于管理消费组的组件。而消费者客户端中的ConsumerCoordinator组件负责与GroupCoordinator进行交互。
ConsumerCoordinator与GroupCoordinator之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成

触发再均衡的操作:
· 有新的消费者加入消费组。
· 有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的 GC、网络延迟导致消费者长时间未向GroupCoordinator发送心跳等情况时,GroupCoordinator会认为消费者已经下线。
· 有消费者主动退出消费组(发送 LeaveGroupRequest 请求)。比如客户端调用了unsubscrible()方法取消对某些主题的订阅。
· 消费组所对应的GroupCoorinator节点发生了变更。
· 消费组内所订阅的任一主题或者主题的分区数量发生变化。

消费者拦截器

consumerIntercepter
对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。

多线程

一个consumer对应多个handler去处理业务

重要参数

fetch.min.bytes 默认1B,调大可能会有延迟
fetch.max.bytes 默认50MB
fetch.max.wait.ms 默认500ms
max.poll.records 一次拉去多少条,默认500
.isolation.level 消费者的事务隔离级别 如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到HW(High Watermark)处的位置

主题和分区

分区分配

分区分配是指为集群制定创建主题时的分区副本分配方案

默认情况下创建主题时总是从编号为0的分区依次轮询进行分配
startIndex是一个随机数,保证多个主题的情况下尽可能地均匀分布分区副本

如果指定机架,broker.rack,会尽可能保证分区在不同的机架上

删除分区

目前暂不支持,涉及到高可用、消息的处理等,复制期间的可用性

kafkaAdminClient

将管理、监控、运维、告警集成到其他系统。

autocreate.topics.enable为false,将创建topic操作封装到资源申请、审核系统,主题创建验证:CreateTopicPolicy接口,validate方法验证合法性。

leader副本

broker节点的leader副本个数多少决定这个节点负载的高低

优先副本 preferred leader,分区平衡

auto.leader.rebalance.enable默认true,controller有个定时任务轮询所有broker节点,计算分区不平衡率是否超过 leader.imbalance.per.broker.percentage参数的比值,默认10%,超过会自动执行有限副本的选举,执行周期默认5min
生产环境不建议设置为默认的true
最好埋点设置相应的告警,然后手动执行分区平衡。

在优先副本的选举过程中,具体的元数据信息会被存入 ZooKeeper的/admin/preferred_replica_election节点,如果这些数据超过了ZooKeeper节点所允许的大小,那么选举就会失败。默认情况下ZooKeeper所允许的节点数据大小为1MB。

在实际生产环境中,一般使用 path-to-json-file 参数来分批、手动地执行优先副本的选举操作。尤其是在应对大规模的 Kafka 集群时,理应杜绝采用非 path-to-json-file参数的选举操作方式。同时,优先副本的选举操作也要注意避开业务高峰期,以免带来性能方面的负面影响。

Kafka提供了 kafka-reassign-partitions.sh 脚本来执行分区重分配的工作,它可以在集群扩容、broker节点失效的场景下对分区进行迁移。

首先创建需要一个包含主题清单的JSON 文件,其次根据主题清单和 broker 节点清单生成一份重分配方案,最后根据这份方案执行具体的重分配动作。

分区重分配的基本原理是先通过控制器为每个分区添加新副本(增加副本因子),新的副本将从分区的leader副本那里复制所有的数据。根据分区的大小不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里移除(恢复为原先的副本因子数)。注意在重分配的过程中要确保有足够的空间。
可以借助kafka-perferred-replica-election.sh 脚本来执行一次优先副本的选举动作,之后可以看到主题 topic-reassign 的具体信息已经趋于完美:

如果要将某个broker下线,那么在执行分区重分配动作之前最好先关闭或重启broker。这样这个broker就不再是任何分区的leader节点了,它的分区就可以被分配给集群中的其他broker。这样可以减少broker间的流量复制,以此提升重分配的性能,以及减少对集群的影响。

复制限流

副本间的复制限流有两种实现方式:kafka-config.sh脚本和kafka-reassign-partitions.sh脚本
kafka-config.sh脚本主要以动态配置的方式来达到限流的目的,在broker级别有两个与复制限流相关的配置参数:follower.replication.throttled.rate和leader.replication.throttled.rate,前者用于设置follower副本复制的速度,后者用于设置leader副本传输的速度,它们的单位都是B/s。通常情况下,两者的配置值是相同的。下面的示例中将broker 1中的leader副本和follower副本的复制速度限制在1024B/s之内,即1KB/s:

在主题级别也有两个相关的参数来限制复制的速度:leader.replication.throttled.replicas 和 follower.replication.throttled.replicas,它们分别用来配置被限制速度的主题所对应的leader副本列表和follower副本列表。

修改副本因子

通过重分配所使用的 kafka-reassign-partition.sh 脚本实现的。分区副本的分配怎么计算?可以程序计算

如何选择合适的分区数

需要压测,producer和consumer的.sh

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐