前置:熟悉javase,熟悉linux,熟悉idea,熟悉hadoop

1. KafKa

1.1 KafKa定义

前端埋点记录用户(浏览,点赞,收藏,评论)到日志服务器,然后通过Flume(小于100m/s)将大日志文件导入到Hadoop集群,每产生一个日志就发送到hadoop(上传100m/s)中。
秒杀活动:Flume采集速度大于200ms/s,就需要KafKa集群。
kafka
Kafka传统定义:一个分布式的基于发布/订阅消息队列(MessageQueue),主要用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息
Kafka最新定义:一个开源的分布式事件流平台(EventStreamingPlatform),用于高性能数据管道,流分析,数据集成和关键任务应用

1.2消息队列

常见消息队列:Kafka,ActiveMQ,RabbitMQ,RocketMQ,大数据场景主要采用Kafka。

1.2.1 传统消息队列应用

缓存/削峰,解耦和异步通信。
缓冲/削峰:控制和优化数据流速度,解决生产和消费消息处理速度不一致的情况。
解耦:独立的扩展或修改两边的处理过程,只要确保他们遵循同样的接口约束。
数据解耦
异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后再需要的时候再去处理。

1.2.2 消息队列的两种模式

  1. 点对点模式 消费者主动拉取数据,消息收到后清除消息。
    点对点
  2. 发布/订阅模式
  • 可以有多个topic主题(浏览,点赞,收藏,评论)
  • 消费者消费数据之后,不删除数据
  • 每个消费者独立,都可以消费数据
    发布/订阅

1.3 KafKa基础架构

KafKa架构

  1. 为方便扩展,提高吞吐量(大数据量),一个topic分为多个partition
  2. 配合分区的设计,提出消费者组(多消费)的概念,组内每个消费者并行消费,注意:某一个分区数据只能由一个消费者消费,防止会出现混乱
  3. 为提高可用性,为每个partition增加若干副本,类似NameNode HA,leader挂了后才会用副本。
  4. ZK中记录谁是leader,Kafka2.8.0后可不采用ZK,用kraft

2. KafKa快速入门

2.1 安装部署

2.1.1 集群规划

hadoop102hadoop103hadoop104
ZKZKZK
KafkaKafkaKafka

2.1.2 集群部署

注意:(在server.properties中修改)每个kafka在集群中的broker.id一定要唯一,修改log.dirs地址,修改zookeeper.connect地址
注意:一定要先关kafka,再关zookeeper

  1. 官方地址:http://kafka.apache.org/downloads.html
  2. 解压,修改文件名,修改配置文件

2.2.2 集群启停脚本

--kf.sh
#!/bin/bash
case $1 in
"start")
	for i in hadoop1 hadoop2 hadoop3
	do
		echo "---启动 $i kafka---"
		ssh $i"/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
	done
;;
"stop")
	for i in hadoop1 hadoop2 hadoop3
	do
		echo "---启动 $i kafka---"
		ssh $i"/opt/module/kafka/bin/kafka-server-stop.sh"
	done
;;
esac

chomod 777

2.2 Kafka命令行操作

2.2.1 主题命令行操作

  1. 查看操作主题命令: bin/kafka-topics.sh
    连接的kafkaBroker主机名端口:–bootstarp-server<String:server toconnect to>
    操作topic名称:–topic String:topic
    创建主题:–create
    删除主题:–delete
    修改主题:–alter
    查看主题:–list
    查看详情 --describe
    设置分区数 --partitions<Integer:# of partitions>
    设置分区副本:–replication-factor<Integer:replication factor>
  2. 查看当前服务器中的所有topic:bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  3. 创建first topic:bin/kafka-topics.sh --bootstrap-server
进入kafka目录下,不用进入bin目录下

-- 查看所有topic

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list

-- 查看指定topic信息

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe --topic test

-- 创建topic信息

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

-- 创建生产者产生消息,不关闭页面

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

-- 创建消费者接收消息,不关闭页面

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

-- 删除topic:

.\bin\windows\kafka-topics.bat kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --delete --topic test

2.2.2 生产者命令行操作

生产者发送:bin/kafka-console-producer.sh --bootstrap-server localhost:9092 -topic first

2.2.3 消费者命令行操作

消费者消费:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic first
查看历史数据:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic first-beginning

3 KafKa生产者

3.1 生产者消息发送流程

3.1.1 发送原理

在消息发送过程中有2线程,main和sender。main中创建一个双端队列RecordAccumulator。main将消息发给队列,Sender线程从中拉取消息发送到Kafka Broker。
Kafka发送
DQueue中数据发给Send条件:

  • batch.size:数据累计到batch.size,sender就会发送数据。默认16k
  • linger.ms:如果数据没达到batch.size,sender在linger.ms时间后也会发送数据,单位ms,默认0ms,没有延时。
    应答ack级别:
  • 0:生产者发送的数据不需要等待数据落盘应答
  • 1:生产者发送的数据Leader收到数据后应答
  • -1(all):生产者发送的数据,Leader和ISR队列里所有的节点收齐数据后应答。-1和all等价。

3.1 异步发送API

3.2.1 普通异步发送

  1. 需求:创建Kafka生产者,采用异步的方式发送到Kafka Broker
    注:windows下创建topic: bin目录windows下执行:.\kafka-topics.bat --delete --bootstrap-server 127.0.0.1:9092 --topic test
public class CustomProducer {
    public static void main(String[] args) {
        //0.配置
        Properties properties = new Properties();
        //连接kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        //指定对应的ke和value序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i));
        }
        //3.关闭资源
        kafkaProducer.close();
    }
}

3.2.1 带回调函数的异步发送

回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数:元数据信息(RecordMetadata)和异常信息(Exception),如果Exception为null,说明发送成功。
只需在send()方法上添加callback参数即可。
kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i), new Callback() {
	@Override
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
		if(e == null){
			System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
		}
	}
);

3.3 同步发送API

只需在异步发送的基础上,再调用get()方法即可。

kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i)).get();

3.4 生产者分区

3.4.1 分区好处

  1. 便于合理使用资源,每个partition在一个Broker上存储,可把海量数据按照分区切割成一块一块的数据存储在多台Broker上,合理控制分区,可实现负载均衡的效果。
  2. 提高并行度,生产者可以分区为单位发送数据,消费者可以分区为单位进行消费。
    发送分区

3.4.2 生产者发送消息的分区策略

  1. 默认的分区器DefaultPartitioner(源码:IDEA中ctrl+n,全局查找DefaultPartitioner)
  • 有设置分区,则有设置值。
  • 没设置分区,将key的hash与topic的partition数进行取余得到partition值。
  • 没设置分区,也没有key,则用粘性分区,随机选择一分区,待该分区batch满了,kafka再随机一个分区。
    生产者发送消息的分区策略

3.4.2 自定义分区

如果根据企业需求,自己实现分区器。
  1. 需求
    将发送过来的数据如果包含x,就发往0号分区,否则发往1号分区。
  2. 实现
/**
 * @author 自定义分区器
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取数据
        String msgValue = value.toString();
        int partition;
        if (msgValue.contains("x")) {
            partition = 0;
        } else {
            partition = 1;
        }
        return partition;
    }
    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> map) {}
}
//使用:关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com/xuyu/kafka/producer/config/MyPartitioner.java");

3.5 生产经验-如何提高吞吐量

从生产者到kafka集群broker仓库本来一次只发送一个data包数据,修改batch.size(批次大小,默认16k),linger.ms(等待时间,修改为5-100ms)使等待时间和发送数据量增大。同时使用compression.type(压缩snappy)将数据压缩。并将RecordAccumulator(缓冲区大小,修改为64m)调大。

public class CustomProducerParameters {
    public static void main(String[] args) {
        //0. 配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //连接/集群
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  //key序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  //value序列化
        //优化参数
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);  //缓冲区大小,默认32M
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  //批次大小,默认16K
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);  //linger.ms 等待时间,默认0
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");  //压缩,默认none,可选gzip,snappy,lz4,zstd
        //1. 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //2. 发送
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i));
        }
        //3. 关闭
        kafkaProducer.close();
    }
}

3.6 生产经验-数据可靠性

  1. acks应答原理
    ACK应答级别
    0:生产者发送过来的数据,不需要等数据落盘应答。
    问题:当Producer数据到leader内存中,服务器挂掉,数据会丢失,效率高
    1:生产者发送夺来的数据,Leader收到数据后应答。
    问题:当Producer数据到Leader内存中,应答完成后还没开始同步副本就挂掉了,新的leader不会收到这个信息,因为生产者已经认为发送成功了,数据会丢失,效率中等
    ack1
    -1(all):生产者发送过来的数据,Leader和ISR队列里所有节点都收到数据后应答。
    ack-1
    问题:Leader收到数据,ISR队列所有Follower都同步到数据,可靠性高,效率低,但有一个长时间没有同步到,怎么办?
    解决:Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0 isr:0,1,2),如果Follower长时间未向Leader发送通信请求或同步数据,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。
    数据可靠性分析:如果分区副本设置为1,或ISR里应答的最小副本数为1(min.insync.replicas默认1),和ack=1的效果是一样的,仍会丢失数据(leader:0 isr:0)。
    数据完全可靠条件 = ACK级别设置为-1 + 分区副本>=2 + ISR里应答的最小副本数>=2
    结论:生产环境中,ack=0不用,ack=1用于日志传输,ack=-1用于传输和钱相关的数据,对可靠性要求高的场景。
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "1");  //ack参数,默认-1 可选0,1,-1
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);  //重试次数,默认int最大值21亿

数据重复分析:生产者发送过来的数据,Leader和ISR队列中所有节点收到数据后,leader挂了,没有返回ack,选出新的leader重复收到相同的数据。解决见下文。

3.7 生产经验-数据去重

3.7.1 数据传递语义

  • 至少一次(At LEadtest Once):ACK级别设置为-1 + 分区副本>=2 + ISR里应答的最小副本数>=2,可保证数据不丢失,不能保证数据不重复。
  • 至多一次(At Most Once):ACK级别设置为0,可保证数据不重复,不能保证数据不丢失。
  • 精确一次(Exactly Once):重要数据,既不能重复也不能丢失。
    Kafka 0.11版本后,引入幂等性和事务。

3.7.2 幂等性

  1. 幂等原理
    幂等性指Producer不论向Broker发送多少次重复数据,Broker端只会持久化一条,保证不重复。
    精确一次(Exactly Once) = 幂等性 + 至少一次(ack = -1 + 分区副本数>=2 + ISR最小副本数量>=2)。
    重复数据判断依据:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。所以幂等性只能保证在单分区单会话内不重复。
  2. 如何使用幂等性
    开启参数enable.idempotence默认为true。

3.2.3 生产者事务

  1. Kafka事务原理
    说明:开启事务必须开启幂等性
    Producer在使用事务功能前,必须先自定义一个唯一的transactional.id。有此id,即使客户端挂掉,它重启后也能继续处理未完成的事务。
    Kafka生产者事务
  2. Kafka的事务的5个API
    void initTransactions(); 初始化事务
    void beginTransaction(); 开启事务
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId); 在事务内提交已经消费的偏移量(主要用于消费者)
    void commitTransaction(); 提交事务
    void abortTransaction(); 放弃事务(回滚)
  3. 单个Producer,使用事务保证消息仅一次发送。
public class CustomProducerTransactions {
    public static void main(String[] args) {
        //0.配置
        Properties properties = new Properties();
        //连接kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //指定对应的ke和value序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_1");  //指定事务id
        //1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //添加事务
        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        try {
            //2.发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<String, String>("first", "xuyu" + i));
            }
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            kafkaProducer.abortTransaction();
        } finally {
            //3.关闭资源
            kafkaProducer.close();
        }
    }
}

3.8 生产经验-数据有序

单分区有序
单分区内有序:有条件,见下文
多分区,分区与分区间无序。

3.8 生产经验-数据乱序

  1. kafka在1.x版本之前保证数据单分区有序,条件:max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等)
  2. kafka在1.x以及后版本保证数据单分区有序,条件:
  • 未开幂等性:max.in.flight.requests.per.connection=1
  • 开启幂等性:max.in.flight.requests.per.connection需要设置<=5
    原因:在kafka1.x后,启用幂等,kafka服务端会缓存producer发来的最近的5个request的元数据进行重排序,无论如何,都可保证最近5个request有序。因为幂等性的pid可保证。类似tcp数据校验
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐