接下篇
消息系统:

1、消息系统的应用场景

1.1、应用解耦
将一个大型的任务系统分成若干个小模块,将所有的消息进行统一的管理和存储,因此为了解耦,就会涉及到kafka企业级消息平台

1.2、流量控制
秒杀活动当中,一般会因为流量过大,应用服务器挂掉,为了解决这个问题,一般需要在应用前端加上消息队列以控制访问流量。

1、可以控制活动的人数 可以缓解短时间内流量大使得服务器崩掉
2、可以通过队列进行数据缓存,后续再进行消费处理

1.3、日志处理
日志处理指将消息队列用在日志处理中,比如kafka的应用中,解决大量的日志传输问题;
日志采集工具采集 数据写入kafka中;kafka消息队列负责日志数据的接收,存储,转发功能;
日志处理应用程序:订阅并消费 kafka队列中的数据,进行数据分析。

1.4、异步处理
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。

1.2 消息队列中间件

消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。

目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。

消息队列就是将需要传输的数据存放在队列中

2、了解消息系统的分类

2.1、点对点

主要采用的队列的方式,如A->B 当B消费的队列中的数据,那么队列的数据就会被删除掉【如果B不消费那么就会存在队列中有很多的脏数据】

2.2、发布-订阅

发布与订阅主要三大组件
主题:一个消息的分类
发布者:将消息通过主动推送的方式推送给消息系统;
订阅者:可以采用拉、推的方式从消息系统中获取数据

大多数的消息系统是基于发布-订阅消息系统

3、kafka企业级消息系统

3.1、简介

kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等。

Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
1.发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
2.以容错的持久化方式存储数据流
处理数据流

1、 Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
2、Store streams of records in a fault-tolerant durable way.
3、Process streams of records as they occur.

3.2、支持的语言

kafka目前支持多种客户端的语言:java、python、c++、php等

3.3、apache kafka是一个分布式发布-订阅消息系

apache kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个端点传递到另一个端点,kafka适合离线和在线消息消费。kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在zookeeper同步服务之上。它与apache和spark非常好的集成,应用于实时流式数据分析。

3.4、kafka的好处

可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka消息传递系统轻松缩放,无需停机。
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
性能:kafka对于发布和订阅消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
kafka非常快:保证零停机和零数据丢失。

4、kafka应用场景

4.1、指标分析
kafka 通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息, 以产生操作的数据集中反馈

4.2、日志聚合解决方法
kafka可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器。

4.3、流式处理
流式处理框架(spark,storm,flink)重主题中读取数据,对齐进行处理,并将处理后的数据写入新的主题,供 用户和应用程序使用,kafka的强耐久性在流处理的上下文中也非常的有用。

5、kafka架构

我们通常将Apache Kafka用在两类程序:
1.建立实时数据管道,以可靠地在系统或应用程序之间获取数据
2.构建实时流应用程序,以转换或响应数据流
在这里插入图片描述
上图,我们可以看到:
1.Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
2.Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
3.Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
数据库中。
4.Stream Processors:流处理器可以从Kafka中拉取数据,也可以将数据写入到Kafka中。

特性ActiveMQRabbitMQKafkaRocketMQ
所属社区/公司ApacheMozilla Public LicenseApacheApache/Ali
成熟度成熟成熟成熟比较成熟
生产者-消费者模式支持支持支持支持
发布-订阅支持支持支持支持
REQUEST-REPLY支持支持-支持
API完备性低(静态配置)
多语言支持支持JAVA优先语言无关支持,JAVA优先支持
单机呑吐量万级(最差)万级十万级十万级(最高)
消息延迟-微秒级毫秒级-
可用性高(主从)高(主从)非常高(分布式)
消息丢失-理论上不会丢失-
消息重复-可控制理论上会有重复-
事务支持不支持支持支持
文档的完备性
提供快速入门
首次部署难度-

kafka整体架构图

在这里插入图片描述一个典型的kafka集群中包含若干个Producer,若干个Broker,若干个Consumer,以及一个zookeeper集群; kafka通过zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行Rebalance(负载均 衡);Producer使用push模式将消息发布到Broker;Consumer使用pull模式从Broker中订阅并消费消息。

4、kafka术语

4.1、kafka中术语介绍

Broker

kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker,一个Broker可以理解为一台服务器
在这里插入图片描述
broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

zookeeper

ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker

Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

Topic

每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic

主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
在主题中的消息是有结构的,一般一个主题包含某一类消息
一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

分区(Partitions)

Partition是一个物理上的概念,每个Topic包含一个或者多个Partition

副本(Replicas)

副本可以确保某个服务器出现故障时,确保数据依然可用
在Kafka中,一般都会设计副本的个数>1

偏移量(offset)

在这里插入图片描述
offset记录着下一条将要发送给Consumer的消息的序号
默认Kafka将offset存储在ZooKeeper中
在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

Producer

负责发布消息到kafka的Broker中。

Consumer

消息消费者,向kafka的broker中读取消息的客户端

Consumer Group

在这里插入图片描述

  • 每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)
  • consumer group是kafka提供的可扩展且具有容错性的消费者机制
  • 一个消费者组可以包含多个消费者
  • 一个消费者组有一个唯一的ID(group Id)
  • 组内的消费者一起消费主题的所有分区数据

4.2、kafka中topic说明

kafka将消息以topic为单位进行归类

topic特指kafka处理的消息源(feeds of messages)的不同分类。

topic是一种分类或者发布的一些列记录的名义上的名字。kafka主题始终是支持多用户订阅的;也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。

在kafka集群中,可以有无数的主题。

生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。

4.3、kafka中分区数(Partitions)

Partitions:分区数

一个broker服务下,是否可以创建多个分区?
可以的,broker数与分区数没有关系; 在kafka中,每一个分区会有一个编号:编号从0开始
某一个分区的数据是有序的
在这里插入图片描述
说明-数据是有序 如何保证一个主题下的数据是有序的?(生产是什么样的顺序,那么消费的时候也是什么样的顺序)

一个主题(topic)下面有一个分区(partition)即可
topic的Partition数量在创建topic时配置。

Partition数量决定了每个Consumer group中并发消费者的最大数量。(一个topic当中的每一个分区同一时间只能被 一个消费组里的一个线程进行消费)

Consumer group A 有两个消费者来读取4个partition中数据;Consumer group B有四个消费者来读取4个 partition中的数据
在这里插入图片描述

4.4、kafka中副本数( Partition Replication)

副本数(replication-factor):控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数

一个broker服务下,是否可以创建多个副本因子?
不可以;创建主题时,副本因子应该小于等于可用的broker数。

副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本;主副本叫做leader,从副本叫做 follower(在有多个副本的情况下,kafka会为同一个分区下的分区,设定角色关系:一个leader和N个 follower),处于同步状态的副本叫做in-sync-replicas(ISR);follower通过拉的方式从leader同步数据。消费 者和生产者都是从leader读写数据,不与follower交互。
副本因子的作用:让kafka读取数据和写入数据时的可靠性。

副本因子是包含本身|同一个副本因子不能放在同一个Broker中。

如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个钟,选择一个leader,但不会在其 他的broker中,另启动一个副本(因为在另一台启动的话,存在数据传递,只要在机器之间有数据传递,就 会长时间占用网络IO,kafka是一个高吞吐量的消息系统,这个情况不允许发生)所以不会在零个broker中启 动。

如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。

lsr表示:当前可用的副本

4.5、kafka Partition offset

任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),
offset是一个long类型数字,它唯一标识了一条消息,消费者通过(offset,partition,topic)跟踪记录消费的偏移量。

4.6、kafka分区和消费组之间的关系

消费组: 由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次

消费组里消费者的个数,应该小于等于该主题下的分区数。如下所示:

如:某一个主题有4个分区,那么消费组中的消费者应该小于4,而且最好与分区数成整数倍
1 2 4

一个topic当中的每一个分区同一时间只能被 一个消费组里的一个线程进行消费

总结:分区数越多,同一时间可以有越多的消费者来进行消费,消费数据的速度就会越快,提高消费的性能

5、集群搭建:

验证zk集群是否可用:
bin/zkServer.sh status 一定要保证出现一个leader,其他的都是follower即可

5.1 kafka集群安装

下载安装包
http://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

cd /export/servers/kafka_2.11-0.10.0.0/config
vim server.properties
broker.id=0  #
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs #	
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181 #
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true #
host.name=node01 #

5.2、启动集群

1、启动命令-前台启动
node01服务器执行以下命令来启动kafka集群

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-start.sh config/server.properties

2、启动命令-后台启动
node01执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-0.10.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

3、停止命令
node01执行以下命令便可以停止kakfa进程
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-stop.sh

6、Kafka集群操作(控制台操作)

6.1、创建一个Topic

创建了一个名字为test的主题, 有三个分区,有两个副本
node01执行以下命令来创建topic

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test

6.2、查看主题命令

查看kafka当中存在的主题
node01使用以下命令来查看kafka当中存在的topic主题

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181

6.3、生产者生产数据

模拟生产者来生产数据
node01服务器执行以下命令来模拟生产者进行生产数据

cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

6.4、消费者消费数据

node02服务器执行以下命令来模拟消费者进行消费数据

bin/ kafka-console-consumer.sh --from-beginning --topic test  --zookeeper node01:2181,node02:2181,node03:2181

6.5、运行describe topics命令

node01执行以下命令运行describe查看topic的相关信息

bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

结果说明:

这是输出的解释。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分 区用于此主题,因此只有一行。

“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。(因为在kafka中 如果有多个副本的话,就会存在leader和follower的关系,表示当前这个副本为leader所在的broker是哪一个)
“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。(所有副本列表 0 ,1,2)
“isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。(可用的列表 数)

6.6、修改topic属性

6.6.1、增加topic分区数
任意kafka服务器执行以下命令可以增加topic分区数
6.6.2、增加配置
动态修改kakfa的配置
任意kafka服务器执行以下命令可以增加topic分区数

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1

6.6.3、删除配置
动态删除kafka集群配置

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages

6.6.4、删除topic
目前删除topic在默认情况下知识打上一个删除的标记,在重新启动kafka后才删除。如果需要立即删除,则需要在
server.properties中配置:

delete.topic.enable=true

然后执行以下命令进行删除topic

kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName

kafka-producer-perf-test.sh

生产消息基准测试
在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 
--throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,
node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1
--topic topic的名字
--num-records	总共指定生产数据量(默认5000W)
--throughput	指定吞吐量——限流(-1不指定)
--record-size   record数据大小(字节)
--producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式

kafka-consumer-perf-test.sh

bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,
node2.itcast.cn:9092,node3.itcast.cn:9092 --topic benchmark 
--fetch-size 1048576 --messages 5000000
bin/kafka-consumer-perf-test.sh
--broker-list 指定kafka集群地址
--topic 指定topic的名称
--fetch-size 每次拉取的数据大小
--messages 总共要消费的消息个数

7、kafka集群操作-JavaAPI操作

添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version> 0.10.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.0</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <!-- java编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

生产者代码

public class OrderProducer {
        public static void main(String[] args) throws InterruptedException {
            /* 1、连接集群,通过配置文件的方式
             * 2、发送数据-topic:order,value
             */
            Properties props = new Properties();
            props.put("bootstrap.servers", "node01:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
                    (props);
            for (int i = 0; i < 1000; i++) {
// 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
                kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信息"+i));
                Thread.sleep(100);
            }
        }
}

消费者代码-自动提交offset值

public class OrderConsumer {
    public static void main(String[] args) {
// 1\连接集群
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop-01:9092");
        props.put("group.id", "test");

//以下两行代码 ---消费者自动提交offset值 
		props.put("enable.auto.commit", "true"); 
		props.put("auto.commit.interval.ms",  "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
                (props);
//		 2、发送数据 发送数据需要,订阅下要消费的topic。	order 
			kafkaConsumer.subscribe(Arrays.asList("order")); 
			while (true) {
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll获取元素。 blockingqueue put插入原生, take获取元素
        for (ConsumerRecord<String, String> record : consumerRecords) {
            System.out.println("消费的数据为:" + record.value());
        }
    }
}

消费者代码-手动提交offset

如果Consumer在获取数据后,需要加入处理,数据完毕后才确认offset,需要程序来控制offset的确认? 关闭自动提交确认选项

props.put("enable.auto.commit",  "false");

手动提交offset值

 kafkaConsumer.commitSync();
while(true)
        {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                insertIntoDb(buffer);
// 手动提交offset值
                consumer.commitSync();
                buffer.clear();
            }
        }

commitSync将所有已接收的记录标记为已提交。

消费者代码-完成处理每个分区中的记录后提交偏移量

在某些情况下,您可能希望通过明确指定偏移量 来更好地控制已提交的记录。 以下我们在完成处理每个分区中的记录后提交偏移量。

public class ConmsumerPartition {

    /**
     * 处理完每一个分区里面的数据,就马上提交这个分区里面的数据
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "false"); //禁用自动提交offset,后期我们手动提交offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(Arrays.asList("mypartition"));
        while (true){
            //通过while ture进行消费数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            //获取mypartition这个topic里面所有的分区
            Set<TopicPartition> partitions = records.partitions();
            //循环遍历每一个分区里面的数据,然后将每一个分区里面的数据进行处理,处理完了之后再提交每一个分区里面的offset
            for (TopicPartition partition : partitions) {
                //获取每一个分区里面的数据
                List<ConsumerRecord<String, String>> records1 = records.records(partition);
                for (ConsumerRecord<String, String> record : records1) {
                    System.out.println(record.value()+"===="+ record.offset());
                }
                //获取我们分区里面最后一条数据的offset,表示我们已经消费到了这个offset了
                long offset = records1.get(records1.size() - 1).offset();
                //提交offset
                //提交我们的offset,并且给offset加1  表示我们下次从没有消费的那一条数据开始消费
                kafkaConsumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset + 1)));
            }
        }
    }
}

注意事项:
提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。 因此,在调用commitSync(偏移量)时,应该 在最后处理的消息的偏移量中添加一个

消费者代码-使用消费者消费指定分区的数据

1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 维护的分区的记录。

2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作为流处理框架的一部分)。 在这种情况下,Kafka不需要检测故障并重新分配分区,因为消耗过程将在另 一台机器上重新启动。

public class ConsumerSomePartition {
    //实现消费一个topic里面某些分区里面的数据
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        props.put("group.id", "test_group");  //消费组
        props.put("enable.auto.commit", "true");//允许自动提交offset
        props.put("auto.commit.interval.ms", "1000");//每隔多久自动提交offset
        props.put("session.timeout.ms", "30000");
        //指定key,value的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //获取kafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //通过consumer订阅某一个topic,进行消费.会消费topic里面所有分区的数据
       // consumer.subscribe();

        //通过调用assign方法实现消费mypartition这个topic里面的0号和1号分区里面的数据

        TopicPartition topicPartition0 = new TopicPartition("mypartition", 0);
        TopicPartition topicPartition1 = new TopicPartition("mypartition", 1);
        //订阅我们某个topic里面指定分区的数据进行消费
        consumer.assign(Arrays.asList(topicPartition0,topicPartition1));

        int i =0;
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                i++;
                System.out.println("数据值为"+ record.value()+"数据的offset为"+ record.offset());
                System.out.println("消费第"+i+"条数据");
            }

        }
    }

}

注意事项:
1、要使用此模式,您只需使用要使用的分区的完整列表调用assign(Collection),而不是使用subscribe订阅 主题。
2、主题与分区订阅只能二选一

消费者数据丢失-数据重复

说明:
1、已经消费的数据对于kafka来说,会将消费组里面的offset值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
2、提交过程:是通过kafka将offset进行移动到下个message所处的offset的位置。
3、拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么kafka上的offset值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。
4、什么时候提交offset值?在Consumer将数据处理完成之后,再来进行offset的修改提交。默认情况下offset是 自动提交,需要修改为手动提交offset值。

5、如果在处理代码中正常处理了,但是在提交offset请求的时候,没有连接到kafka或者出现了故障,那么该次修 改offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的offset值再进行处理一 次,那么在hbase中或者mysql中就会产生两条一样的数据,也就是数据重复

Future接口

public interface Future<V>
Future表示异步计算的结果。方法用于检查计算是否完成、等待计算完成以及检索计算的结果。结果只能在计算完成后使用get方法检索,必要时阻塞直到准备好。取消是由取消方法执行的。还提供了其他方法来确定任务是否正常完成或被取消。一旦计算完成,计算不能取消。如果为了可取消性而使用Future,但不提供可用的结果,可以声明Future<?>并返回null作为底层任务的结果。

Future常用方法

boolean cancel(boolean mayInterruptIfRunning)
//试图取消此任务的执行。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。
//如果成功,且调用cancel时此任务尚未启动,则此任务将永远不会运行。如果任务已经启动,那么
//mayInterruptIfRunning参数确定是否应该中断执行该任务的线程以试图停止该任务。

boolean isCancelled();
//如果该任务在正常完成之前被取消,则返回true。

boolean isDone();
//如果任务完成,返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,该方法将返回true。

V get()
//等待计算完成,然后检索其结果。

V get(long timeout, TimeUnit unit)
//如果需要,最多等待给定的时间来完成计算,然后检索其结果

TimeUnit

public enum TimeUnit
TimeUnit表示给定粒度单元上的持续时间,并提供实用方法来在各个单元之间进行转换,并在这些单元中执行计时和延迟操作。
TimeUnit不维护时间信息,但只帮助组织和使用可能在不同上下文中分别维护的时间表示。纳秒的定义是千分之一微秒,一微秒是千分之一毫秒,一毫秒是千分之一秒,一分钟是60秒,一小时是60分钟,一天是24小时。

NANOSECONDS
MICROSECONDS
MILLISECONDS
SECONDS
MINUTES
HOURS
DAYS

TimeUnit常用方法

public long convert(long sourceDuration, TimeUnit sourceUnit)
//将给定单位中的给定持续时间转换为此单位
//For example, to convert 10 minutes to milliseconds, use: 
//TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)

KafkaProducer

public class KafkaProducer<K, V> implements Producer<K, V>
发布记录到Kafka集群的Kafka客户端。
生产者是线程安全的,在线程之间共享一个生产者实例通常比拥有多个实例要快。

生产者由一个缓冲空间池和一个后台I/O线程组成,后者负责将这些记录转换为请求并将它们传输给集群。如果在使用后没有关闭生产者,将会泄漏这些资源。

KafkaProducer常用方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
//异步发送记录到主题,并在确认发送后调用提供的回调。
//发送是异步的,一旦记录被存储在等待发送的记录的缓冲区中,该方法将立即返回。这允许并行地发送许多记录,
//而不必阻塞以等待每个记录之后的响应。

//发送的结果是一个RecordMetadata,它指定了记录被发送到的分区、它被分配的偏移量和记录的时间戳。如果
//主题使用CreateTime,则时间戳将是用户提供的时间戳,或者如果用户没有为记录指定时间戳,则记录发送时间
//如果LogAppendTime用于主题,时间戳将是消息被附加时Kafka代理的本地时间。

public Future<RecordMetadata> send(ProducerRecord<K, V> record)
//相当于send(record, null)

ProducerRecord

public final class ProducerRecord<K, V>
要发送给Kafka的键/值对。这包括要将记录发送到的主题名、一个可选的分区号以及一个可选的键和值。
如果指定了有效的分区号,则在发送记录时将使用该分区。如果没有指定分区,但存在一个键,那么将使用键的散列选择分区。如果键和分区都不存在,那么分区将以循环的方式分配。该记录还具有相关的时间戳。如果用户没有提供时间戳,生产者将用当前时间戳。Kafka最终使用的时间戳取决于为主题配置的时间戳类型。

ProducerRecord构造方法

public ProducerRecord(String topic, Integer partition, 
			Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value)

public ProducerRecord(String topic, K key, V value)

public ProducerRecord(String topic, V value)

ProducerRecord常用方法

public String topic()
//返回此记录要发送到的主题

public Integer partition()

public Long timestamp()

public K key()
	
public V value()

KafkaConsumer

public class KafkaConsumer<K, V> implements Consumer<K, V>
消费来自Kafka集群的记录的Kafka客户端。
它将透明地处理Kafka集群中的服务器故障,并透明地适应它获取的数据分区在集群内迁移。该客户机还与服务器进行交互,以允许消费者组使用消费者组对消费进行负载平衡(如下所述)。

消费者维护到必要的brokers的TCP连接以获取数据。如果在使用后没有关闭消费者,这些连接将会泄漏。消费者不是线程安全的

KafkaConsumer构造方法

public KafkaConsumer(Properties properties)
//通过提供一个Properties对象作为配置来实例化消费者。有效的配置记录在ConsumerConfig中

KafkaConsumer常用方法

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
//订阅给定的主题列表以获得动态分配的分区。主题订阅不是增量的。这个列表将替换当前的赋值(如果有的话)。
//注意,不能通过assign(Collection)将主题订阅与组管理以及手动分区分配结合起来。如果给定的主题
//列表为空,则将其视为unsubscribe()。

public void subscribe(Collection<String> topics)

public void unsubscribe()
//取消订阅当前订阅的主题。这还会清除通过assign(Collection)直接分配的任何分区

public ConsumerRecords<K, V> poll(long timeout)
//使用subscribe/assign APIs之一从指定的主题或分区获取数据。
//在轮询数据之前没有订阅任何主题或分区是错误的。
//timeout:
//如果缓冲区中没有可用的数据,在轮询中等待所用的时间(以毫秒为单位)。
//如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。一定不是负的。


public void assign(Collection<TopicPartition> partitions)
//手动将分区列表分配给该用户。该接口不允许增量赋值,将替换之前的赋值(如果有的话)。

public void commitSync()
//提交在最后一次poll()中为所有订阅的主题和分区列表返回的偏移量。
//这将只向Kafka提交偏移量。使用这个API提交的偏移量将在每次重平衡后的第一次取回和启动时
//使用。因此,如果你需要在Kafka以外的任何地方存储偏移量,这个API不应该被使用

ConsumerRecord

public final class ConsumerRecord<K, V>

从Kafka接收的键/值对。这包括一个主题名和一个分区号,一个指向Kafka分区中的记录的偏移量。

ConsumerRecord构造方法

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          K key,
                          V value)
//创建从指定主题和分区接收的记录
    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          long timestamp,
                          TimestampType timestampType,
                          long checksum,
                          int serializedKeySize,
                          int serializedValueSize,
                          K key,
                          V value)

ConsumerRecord常用方法

public K key()
//返回键

public V value() 
//返回值

public long offset()
public long timestamp()
public int partition()
public String topic()

ConsumerRecords

一个容器,用于保存特定主题的每个分区的ConsumerRecord列表。对于Consumer.poll(long)操作返回的每个主题分区,都有一个ConsumerRecord列表。

ConsumerRecords常用方法

public Set<TopicPartition> partitions()
//在该记录集中包含数据的分区集合(如果没有返回数据,则可能为空)

public List<ConsumerRecord<K, V>> records(TopicPartition partition)
//只获取给定分区的记录

ConsumerConfig

消费者配置 keys

GROUP_ID_CONFIG = "group.id"
MAX_POLL_RECORDS_CONFIG = "max.poll.records"
HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"
ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"
...

开发StreamAPI

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

public class Stream {

    /**
     * 通过streamAPI实现将数据从test里面读取出来,写入到test2里面去
     * @param args
     */
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"bigger");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
        properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key的序列化和反序列化的类
        properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        //获取核心类 KStreamBuilder
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        //通过KStreamBuilder调用stream方法表示从哪个topic当中获取数据
        //调用mapValues方法,表示将每一行value都给取出来
        //line表示我们取出来的一行行的数据
        //将转成大写的数据,写入到test2这个topic里面去
        kStreamBuilder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
        //通过kStreamBuilder可以用于创建KafkaStream  通过kafkaStream来实现流失的编程启动
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.start();  //调用start启动kafka的流 API
    }

}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐