1 Kafka中的一些数量

1.1 Kafka的个数

Kafka的个数一般是3-5个

计算公式为:(生产者生产速率×副本个数/100)×2 + 1

1.2 Kafka分区的个数

Kafka的分区的个数一般是3-10个

计算公式为:期望峰值速率/min(生产速率,消费速率)

1.3 Kafka的副本个数

kafka的副本一般设置为3个,很多企业大都设置为2个

副本的个数越多可靠性越高,但是副本越多增加了网络IO传输压力

1.4 Kafka日志保存的时间

Kafka日志默认保存的时间是7天

实际生产环境下一般保存为3天

1.5 Kafka主题Topic的个数

Kafka通常多少个日志类型就有多少个Topic,也有对日志类型进行合并的

我们在实际开发中设置的Topic一般一个分层的表设置为一个Topic

1.6 Kafka的磁盘大小

Kafka磁盘大小设置公式为:每天的数据量×副本个数×保存天数 / 0.7

我们在实际开发共磁盘大小设置为:100g×2个副本×3天 / 0.7

1.7 Kafka中数据量的计算

(我们的场景中)

每天的日志数据量为100g,每天大约产生1亿条日志;相当于每秒产生1150条日志

平均每秒1150条日志

低谷每秒50条

高峰每秒2300-23000条

每条日志大小:1k

每秒多少数据量2.0M-20MB

  • Kafka峰值速率为: 20M/s

1.8 Kafka中单条数据大小

默认情况下,Kafka的单条数据最大值为1M

1.9 Kafka监控

公司一般都会有自己开发的监控器

开源的监控器有:KafkaEagle

2 Kafka生产者和消费者

2.1 Kafka如何生产数据的(Kafka生产数据时候的分区策略)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qEORUN3i-1617540454770)(Kafka知识总结.assets/image-20201102234117929.png)]

①指定partition,数据就会发送到指定的分区内。

②没有指明partition,但是有key,那么会按照key的hash值与topic的partition树进行取余得到partition的值

③既没有partition也没有key,Kafka会采用粘性分区策略,会随机选择一个分区,并尽可能一直使用这个分区,直到这个分区的batch满了,Kafka会随机选择一个分区进行使用。

2.2 Kafka如何消费数据的(Kafka消费者的分区分配策略)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D5qsS95P-1617540454772)(Kafka知识总结.assets/image-20201103205251602-1617515228126.png)]

①Range(默认)

根据主题划分:将某个Topic的不同分区,根据分区数/消费者线程数,将模数分给所有的消费者,余数给第一个消费者.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0yhCAzGW-1617540454773)(Kafka知识总结.assets/image-20201103205206053.png)]

②RoundRobin策略

根据组去消费,将一个Topic的不同分区,封装成多个对象,取对象的hash值进行排序,轮询的方式发送给不同的消费者.

3 Kafka如何保证数据的可靠性

3.1 Kafka怎样保证发送数据不丢失?

通过副本同步队列中的ISR应答机制,保证数据的可靠性.

也就是说producer生产的消息发到每个Topic的partition,ISR中的leader和follower什么时候返回ack?

如果producer没有收到ack,那么下一次的时候会重新发送.

3.1.1 副本同步队列ISR

副本同步队列ISR中包括Leader和Follower

什么样的副本才能够进入到ISR中呢?

kafka0.10版本之前有两个参数控制什么样的副本才能够进入到ISR中:

replica.lag.max.messages(延迟条数),follower和leader同步的数据差距小于10条的

replica.lag.time.max.ms(延迟时间),follower和leader交互的时间在10s之内的

但是在kafka0.10版本之后就取消了消息的条数,只保留了交互时间

3.1.2 ISR的级别有哪些?(语义级别)

①0,最低的延迟,leader接收到但是还没有写入到磁盘就返回ack,这个时候offset就增加,producer就继续生产.

​ (另一种表述是说producer发送完,offset就增加,继续生产)

这种情况,如果leader发生故障,数据还没有写入成功,而这个时候offset已经提交了,那么数据就会丢失.

②1,默认的级别,leader完全接收到消息,再返回ack,不等ISR中follower写完,offset就增加,producer就继续生产.

这种情况,leader写完了,但是follower还没有同步完成,这个时候leader挂掉了,这个时候offset已经提交了,那么数据也是有可能丢失掉的.

③-1,ISR中的leader和follower全部接收到消息,再返回ack,offset就增加,producer就继续生产.

有可能导致数据丢失:

​ 如果ISR中只有一个leader,那么这个时候ACK的级别就变成1了,那么这个时候,如果ISR中leader同步完成返回了ack,offset提交了,但是在这个时候leader挂掉了,那么此时集群中的副本数据是缺失的,所以也会导致数据丢失.

有可能导致数据重复:

​ 如果ISR中的leader和follower全部同步完成了,返回ack的时候leader挂掉了,那么producer没有收到ack,offset不会提交,所以它会认为失败了,需要重新发送,所以这个时候会导致数据的重复.

3.1.3 如何保证数据不丢失呢?

ack级别设置为-1,这个时候是at least once,通过上述分析也是有可能导致数据丢失的,所以需要设置两个地方:

①首先设置ack的级别为-1, request.required.acks=-1;

②同时设置ISR中最少的副本数大于1,(默认是1)

min.insync.replicas=多个

3.2 Kafka怎样保证发送数据不重复?

3.2.1 开启幂等性

①如何开启kafka的幂等性?

enable.idempotence=true

②幂等性的实现原理?

首先是ack级别=-1,这个时候数据有可能出现重复,那么需要+幂等性操作.这样就算重发了数据,在持久化的时候,只会持久化一条数据.

原理是:开启幂等性,Producer初始化的时候会分配一个PID,发送数据到Partition的时候,消息会附带上一个Sequence Number,而在Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条.

但是幂等性操作,PID在重启的时候会变化,同时不同的Partition也具有不同的主键,所以说幂等性不能够跨分区,也不能跨会话.

3.2.2 Kafka事务

①Kafka的事务在什么时候引入的?

Kafka的事务在0.11版本引入的.

Kafka的事务是可以跨会话,可以跨分区的.

kafka的事务分为producer事务;consumer事务.

②如何开启kafka的事务?

KafkaProducer producer = createKafkaProducer(
  “bootstrap.servers”, “localhost:9092,
  “transactional.id”, “my-transactional-id”);
 
producer.initTransactions();
 
KafkaConsumer consumer = createKafkaConsumer(
  “bootstrap.servers”, “localhost:9092,
  “group.id”, “my-group-id”,
  "isolation.level", "read_committed");
 
consumer.subscribe(singleton(“inputTopic”));
 
while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
  producer.beginTransaction();
  for (ConsumerRecord record : records)
    producer.send(producerRecord(“outputTopic”, record));
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group); 
  producer.commitTransaction();
}
3.3.3 Kafka生产者事务

kafka的producer事务,在幂等性的基础上,添加了一个全局唯一的TransactionID,并将Producer获取的PID和TransactionID绑定.这样当Producer重启后就可以通过正在进行的TransationID找到原来的PID.

3.3.4 Kafka消费者事务

kafka的consumer事务,解决的是精准一次性消费的问题.

消费者消费了消息,但是没有将offset更新,这个时候消费者挂掉了,再次消费的时候就会重复消费.

如何保证consumer端精准一次性消费呢?

需要将kafka的消费过程和提交offset过程 做原子性绑定.此时可以将offset保存在支持事务的组件中.比如mysql

我们通常是通过手动维护偏移量的方式来保证精准一次性消费.

0.9版本之前的kafka是将消费者消费数据的offset保存在zookeeper中的,从0.9版本之后,默认将offset保存在kafka的一个内置的topic中:_consumer_offsets

3.3 SparkStreaming如何实现精准一次性?

方案1:利用支持事务的数据库

将修改偏移量和业务处理放在一起,要么同时成功,要么同时失败.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DuAM9pL3-1617540454775)(Kafka知识总结.assets/image-20210404181751847.png)]

这种涉及到很难处理的分布式事务问题.一般不会使用.

方案2:使用kafka的at least once + 幂等性处理(手动维护偏移量)

①当然可以使用kafka自身的幂等性处理, 也就是开启幂等性和kafka事务.但是这是一个比较重的操作.

​ 我们一般通过SparkStreaming进行去重操作.

②在SparkStreaming程序中手动维护偏移量 + 去重

(1)手动维护偏移量offset保存在哪里?
  • 自动提交偏移量的话

    • 在0.9版本之前, kafka的偏移量是保存在zookeeper中的
    • 在0.9版本之后, kafka的偏移量是保存在kafka本地Topic: _consumer_offsets主题中
  • 如果手动维护的话

    • 关闭了自动提交偏移量的话, 下次都会从最新的位置开始读取偏移量, 并不会记录偏移量消费到哪.

    • 将偏移量保存在redis中, 利用hash这种结构

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dSnGL7ns-1617540454776)(Kafka知识总结.assets/image-20210404182909676.png)]

步骤1: 关闭自动提交偏移量

//自动提交offset偏移量:如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
//如果是false,会需要手动维护kafka的偏移量
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)

另外说一下: 自动提交偏移量, 是怎么自动提交的呢?

默认情况下, 自动重置为最新的偏移量. 默认是5s, 消费了数据后, 5s就自动提交偏移量了.

//latest自动重置偏移量为最新的偏移量
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
(2)业务数据保存在哪进行幂等性(也就是去重)?
  • Redis实现去重(mset)
  • ES实现幂等性(put不指定id)
  • HBase实现幂等性(rowkey进行去重)
  • MySQL实现幂等性(主键)

开发中使用的是 利用Redis的mset进行去重, 如果有相同的mid, 新来的mid进不来.

另外我们将数据保存到了ES中, 使用的是ES中的put, 不指定id的操作, 这也是幂等性的, 新来的mid, 会被替换掉.

3.4 Flink如何实现精准一次性?

问题: flink对接kafka中的数据源,如果flink挂了,会怎么样?

首先需要说明的是: flink对接kafka中的数据源, 默认的是自动维护偏移量, 也就是说每5s自动提交一次偏移量.

从哪里开始提交呢? 是从最新的偏移量开始提交. latest

  1. 数据可能会丢失
    • 如果消费者一次拉取了100条消息, 并且提交了偏移量, 但是此时消费者还没有处理完数据, 消费者宕机了, 这个时候会发生再均衡, 由另外一个消费者消费该分区的数据, 而这个消费者会从提交完偏移量的位置, 开始消费, 此时就会发生数据丢失的问题了.
  2. 数据可能会重复
    • 如果消费者一次拉取了100条消息, 处理了一部分后, 还没有提交偏移量的时候, 挂掉了, 这个时候再均衡, 由另外一个消费者消费该分区的数据, 此时就会发生重复消费的问题了.
解决: 开启checkpoint+设置状态后端

kafka中的偏移量保存再flink的检查点中, 存储在设置的statebackend中.

4 Kafka如何实现高效读写的?

Kafka是将数据落盘到磁盘中的, 那么怎样去保证他的高效读写呢?

4.1 分布式+分区+分段

  • Kafka是一个分布式集群
  • Kafka中的数据,以Topic保存在不同的分区中, 不同的分区数据存在不同的broker中, 并发度比较高
  • 每个分区的数据以segment进行保存, 其中一个segment包含一个.log和.index文件组成

4.2 顺序写磁盘

Kafka中的数据写入到segment中的.log文件中, 写的过程是顺序写到文件末端的. 另外还有.index保证其读取速率.

IO的速率快慢, 其实是和寻址方式有关的. 经过测试磁盘的顺序写 性能要 高于 内存的随机写

4.3 使用Page Cache

PageCache是操作系统的缓存, 不占用JVM, 不会受GC等操作的影响.

4.4 零拷贝技术

其中黑线表示传统的数据读取过程;

红线表示Kafka利用的零拷贝技术.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j8as8IP9-1617540454777)(Kafka知识总结.assets/image-20210404203406405.png)]

5 Kafka如何保证数据的有序性?

5.1 单分区内有序性

  • Kafka默认情况下是单分区内有序, 不同分区间的数据顺序无法保证

5.2 不同分区间数据有序性如何保证?

方式1: 只设置一个分区, 那么单个分区内有序, 即全局有序.

​ 但是这样的话, 分区数量非常小, 那么并行度就大大降低.

方式2: 发送同一特征的数据到一个分区

(自己写生产者API)

--其中生产者发送数据的send方法
producer.send(new ProducerRecord<>(topic,messageNo,messageStr))
--对这个send方法中的messageNo做文章	
messageNo = database.table.key
    比如: key=100 	按照key的hash(比如为99), 模以3个消费者 那么会进入 0号分区
        	数据分发到 p0 分区上
            p0:insert u1 u2 u3 u4 delete
            p1:
            p2:
        
        key=667 	按照key的hash, 模以3个消费者	进入到1号分区
            p0:
            p1: insert u1 u2 u3 u4 delete
            p2:
这样就能保证对某一条数据的操作分发到单个partition中去,从而保证全局的有序性。      
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐