Kafka基础(一)
消息系统:1、消息系统的应用场景1.1、应用解耦将一个大型的任务系统分成若干个小模块,将所有的消息进行统一的管理和存储,因此为了解耦,就会涉及到kafka企业级消息平台1.2、流量控制秒杀活动当中,一般会因为流量过大,应用服务器挂掉,为了解决这个问题,一般需要在应用前端加上消息队列以控制访问流量。1、可以控制活动的人数 可以缓解短时间内流量大使得服务器崩掉2、可以通过队列进行数据缓存,后续再进行消
接下篇
消息系统:
1、消息系统的应用场景
1.1、应用解耦
将一个大型的任务系统分成若干个小模块,将所有的消息进行统一的管理和存储,因此为了解耦,就会涉及到kafka企业级消息平台
1.2、流量控制
秒杀活动当中,一般会因为流量过大,应用服务器挂掉,为了解决这个问题,一般需要在应用前端加上消息队列以控制访问流量。
1、可以控制活动的人数 可以缓解短时间内流量大使得服务器崩掉
2、可以通过队列进行数据缓存,后续再进行消费处理
1.3、日志处理
日志处理指将消息队列用在日志处理中,比如kafka的应用中,解决大量的日志传输问题;
日志采集工具采集 数据写入kafka中;kafka消息队列负责日志数据的接收,存储,转发功能;
日志处理应用程序:订阅并消费 kafka队列中的数据,进行数据分析。
1.4、异步处理
电商网站中,新的用户注册时,需要将用户的信息保存到数据库中,同时还需要额外发送注册的邮件通知、以及短信注册码给用户。但因为发送邮件、发送注册短信需要连接外部的服务器,需要额外等待一段时间,此时,就可以使用消息队列来进行异步处理,从而实现快速响应。
1.2 消息队列中间件
消息队列中间件就是用来存储消息的软件(组件)。举个例子来理解,为了分析网站的用户行为,我们需要记录用户的访问日志。这些一条条的日志,可以看成是一条条的消息,我们可以将它们保存到消息队列中。将来有一些应用程序需要处理这些日志,就可以随时将这些消息取出来处理。
目前市面上的消息队列有很多,例如:Kafka、RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ等。
消息队列就是将需要传输的数据存放在队列中
2、了解消息系统的分类
2.1、点对点
主要采用的队列的方式,如A->B 当B消费的队列中的数据,那么队列的数据就会被删除掉【如果B不消费那么就会存在队列中有很多的脏数据】
2.2、发布-订阅
发布与订阅主要三大组件
主题:一个消息的分类
发布者:将消息通过主动推送的方式推送给消息系统;
订阅者:可以采用拉、推的方式从消息系统中获取数据
大多数的消息系统是基于发布-订阅消息系统
3、kafka企业级消息系统
3.1、简介
kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等。
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
1.发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
2.以容错的持久化方式存储数据流
处理数据流
1、 Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
2、Store streams of records in a fault-tolerant durable way.
3、Process streams of records as they occur.
3.2、支持的语言
kafka目前支持多种客户端的语言:java、python、c++、php等
3.3、apache kafka是一个分布式发布-订阅消息系
apache kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个端点传递到另一个端点,kafka适合离线和在线消息消费。kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在zookeeper同步服务之上。它与apache和spark非常好的集成,应用于实时流式数据分析。
3.4、kafka的好处
可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka消息传递系统轻松缩放,无需停机。
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
性能:kafka对于发布和订阅消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
kafka非常快:保证零停机和零数据丢失。
4、kafka应用场景
4.1、指标分析
kafka 通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息, 以产生操作的数据集中反馈
4.2、日志聚合解决方法
kafka可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器。
4.3、流式处理
流式处理框架(spark,storm,flink)重主题中读取数据,对齐进行处理,并将处理后的数据写入新的主题,供 用户和应用程序使用,kafka的强耐久性在流处理的上下文中也非常的有用。
5、kafka架构
我们通常将Apache Kafka用在两类程序:
1.建立实时数据管道,以可靠地在系统或应用程序之间获取数据
2.构建实时流应用程序,以转换或响应数据流
上图,我们可以看到:
1.Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
2.Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
3.Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
数据库中。
4.Stream Processors:流处理器可以从Kafka中拉取数据,也可以将数据写入到Kafka中。
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属社区/公司 | Apache | Mozilla Public License | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最高) |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
事务 | 支持 | 不支持 | 支持 | 支持 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
kafka整体架构图
一个典型的kafka集群中包含若干个Producer,若干个Broker,若干个Consumer,以及一个zookeeper集群; kafka通过zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行Rebalance(负载均 衡);Producer使用push模式将消息发布到Broker;Consumer使用pull模式从Broker中订阅并消费消息。
4、kafka术语
4.1、kafka中术语介绍
Broker
kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker,一个Broker可以理解为一台服务器
broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
zookeeper
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker
Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据
Topic
每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic
主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制
在主题中的消息是有结构的,一般一个主题包含某一类消息
一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
分区(Partitions)
Partition是一个物理上的概念,每个Topic包含一个或者多个Partition
副本(Replicas)
副本可以确保某个服务器出现故障时,确保数据依然可用
在Kafka中,一般都会设计副本的个数>1
偏移量(offset)
offset记录着下一条将要发送给Consumer的消息的序号
默认Kafka将offset存储在ZooKeeper中
在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset
偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的
Producer
负责发布消息到kafka的Broker中。
Consumer
消息消费者,向kafka的broker中读取消息的客户端
Consumer Group
- 每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)
- consumer group是kafka提供的可扩展且具有容错性的消费者机制
- 一个消费者组可以包含多个消费者
- 一个消费者组有一个唯一的ID(group Id)
- 组内的消费者一起消费主题的所有分区数据
4.2、kafka中topic说明
kafka将消息以topic为单位进行归类
topic特指kafka处理的消息源(feeds of messages)的不同分类。
topic是一种分类或者发布的一些列记录的名义上的名字。kafka主题始终是支持多用户订阅的;也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。
在kafka集群中,可以有无数的主题。
生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。
4.3、kafka中分区数(Partitions)
Partitions:分区数
一个broker服务下,是否可以创建多个分区?
可以的,broker数与分区数没有关系; 在kafka中,每一个分区会有一个编号:编号从0开始
某一个分区的数据是有序的
说明-数据是有序 如何保证一个主题下的数据是有序的?(生产是什么样的顺序,那么消费的时候也是什么样的顺序)
一个主题(topic)下面有一个分区(partition)即可
topic的Partition数量在创建topic时配置。
Partition数量决定了每个Consumer group中并发消费者的最大数量。(一个topic当中的每一个分区同一时间只能被 一个消费组里的一个线程进行消费)
Consumer group A 有两个消费者来读取4个partition中数据;Consumer group B有四个消费者来读取4个 partition中的数据
4.4、kafka中副本数( Partition Replication)
副本数(replication-factor):控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数
一个broker服务下,是否可以创建多个副本因子?
不可以;创建主题时,副本因子应该小于等于可用的broker数。
副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本;主副本叫做leader,从副本叫做 follower(在有多个副本的情况下,kafka会为同一个分区下的分区,设定角色关系:一个leader和N个 follower),处于同步状态的副本叫做in-sync-replicas(ISR);follower通过拉的方式从leader同步数据。消费 者和生产者都是从leader读写数据,不与follower交互。
副本因子的作用:让kafka读取数据和写入数据时的可靠性。
副本因子是包含本身|同一个副本因子不能放在同一个Broker中。
如果某一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个钟,选择一个leader,但不会在其 他的broker中,另启动一个副本(因为在另一台启动的话,存在数据传递,只要在机器之间有数据传递,就 会长时间占用网络IO,kafka是一个高吞吐量的消息系统,这个情况不允许发生)所以不会在零个broker中启 动。
如果所有的副本都挂了,生产者如果生产数据到指定分区的话,将写入不成功。
lsr表示:当前可用的副本
4.5、kafka Partition offset
任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),
offset是一个long类型数字,它唯一标识了一条消息,消费者通过(offset,partition,topic)跟踪记录消费的偏移量。
4.6、kafka分区和消费组之间的关系
消费组: 由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次。
消费组里消费者的个数,应该小于等于该主题下的分区数。如下所示:
如:某一个主题有4个分区,那么消费组中的消费者应该小于4,而且最好与分区数成整数倍
1 2 4
一个topic当中的每一个分区同一时间只能被 一个消费组里的一个线程进行消费
总结:分区数越多,同一时间可以有越多的消费者来进行消费,消费数据的速度就会越快,提高消费的性能
5、集群搭建:
验证zk集群是否可用:
bin/zkServer.sh status 一定要保证出现一个leader,其他的都是follower即可
5.1 kafka集群安装
下载安装包
http://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
cd /export/servers/kafka_2.11-0.10.0.0/config
vim server.properties
broker.id=0 #
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-0.10.0.0/logs #
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181 #
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true #
host.name=node01 #
5.2、启动集群
1、启动命令-前台启动
node01服务器执行以下命令来启动kafka集群
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-start.sh config/server.properties
2、启动命令-后台启动
node01执行以下命令将kafka进程启动在后台
cd /export/servers/kafka_2.11-0.10.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
3、停止命令
node01执行以下命令便可以停止kakfa进程
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-server-stop.sh
6、Kafka集群操作(控制台操作)
6.1、创建一个Topic
创建了一个名字为test的主题, 有三个分区,有两个副本
node01执行以下命令来创建topic
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test
6.2、查看主题命令
查看kafka当中存在的主题
node01使用以下命令来查看kafka当中存在的topic主题
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
6.3、生产者生产数据
模拟生产者来生产数据
node01服务器执行以下命令来模拟生产者进行生产数据
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
6.4、消费者消费数据
node02服务器执行以下命令来模拟消费者进行消费数据
bin/ kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
6.5、运行describe topics命令
node01执行以下命令运行describe查看topic的相关信息
bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
结果说明:
这是输出的解释。第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分 区用于此主题,因此只有一行。
“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。(因为在kafka中 如果有多个副本的话,就会存在leader和follower的关系,表示当前这个副本为leader所在的broker是哪一个)
“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。(所有副本列表 0 ,1,2)
“isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。(可用的列表 数)
6.6、修改topic属性
6.6.1、增加topic分区数
任意kafka服务器执行以下命令可以增加topic分区数
6.6.2、增加配置
动态修改kakfa的配置
任意kafka服务器执行以下命令可以增加topic分区数
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
6.6.3、删除配置
动态删除kafka集群配置
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
6.6.4、删除topic
目前删除topic在默认情况下知识打上一个删除的标记,在重新启动kafka后才删除。如果需要立即删除,则需要在
server.properties中配置:
delete.topic.enable=true
然后执行以下命令进行删除topic
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
kafka-producer-perf-test.sh
生产消息基准测试
在生产环境中,推荐使用生产5000W消息,这样会性能数据会更准确些。
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000
--throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.itcast.cn:9092,
node2.itcast.cn:9092,node3.itcast.cn:9092 acks=1
--topic topic的名字
--num-records 总共指定生产数据量(默认5000W)
--throughput 指定吞吐量——限流(-1不指定)
--record-size record数据大小(字节)
--producer-props bootstrap.servers=192.168.1.20:9092,192.168.1.21:9092,192.168.1.22:9092 acks=1 指定Kafka集群地址,ACK模式
kafka-consumer-perf-test.sh
bin/kafka-consumer-perf-test.sh --broker-list node1.itcast.cn:9092,
node2.itcast.cn:9092,node3.itcast.cn:9092 --topic benchmark
--fetch-size 1048576 --messages 5000000
bin/kafka-consumer-perf-test.sh
--broker-list 指定kafka集群地址
--topic 指定topic的名称
--fetch-size 每次拉取的数据大小
--messages 总共要消费的消息个数
7、kafka集群操作-JavaAPI操作
添加依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version> 0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
生产者代码
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、连接集群,通过配置文件的方式
* 2、发送数据-topic:order,value
*/
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 发送数据 ,需要一个producerRecord对象,最少参数 String topic, V value
kafkaProducer.send(new ProducerRecord<String, String>("order", "订单信息"+i));
Thread.sleep(100);
}
}
}
消费者代码-自动提交offset值
public class OrderConsumer {
public static void main(String[] args) {
// 1\连接集群
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop-01:9092");
props.put("group.id", "test");
//以下两行代码 ---消费者自动提交offset值
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
// 2、发送数据 发送数据需要,订阅下要消费的topic。 order
kafkaConsumer.subscribe(Arrays.asList("order"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll获取元素。 blockingqueue put插入原生, take获取元素
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("消费的数据为:" + record.value());
}
}
}
消费者代码-手动提交offset
如果Consumer在获取数据后,需要加入处理,数据完毕后才确认offset,需要程序来控制offset的确认? 关闭自动提交确认选项
props.put("enable.auto.commit", "false");
手动提交offset值
kafkaConsumer.commitSync();
while(true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 手动提交offset值
consumer.commitSync();
buffer.clear();
}
}
commitSync将所有已接收的记录标记为已提交。
消费者代码-完成处理每个分区中的记录后提交偏移量
在某些情况下,您可能希望通过明确指定偏移量 来更好地控制已提交的记录。 以下我们在完成处理每个分区中的记录后提交偏移量。
public class ConmsumerPartition {
/**
* 处理完每一个分区里面的数据,就马上提交这个分区里面的数据
* @param args
*/
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "false"); //禁用自动提交offset,后期我们手动提交offset
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList("mypartition"));
while (true){
//通过while ture进行消费数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
//获取mypartition这个topic里面所有的分区
Set<TopicPartition> partitions = records.partitions();
//循环遍历每一个分区里面的数据,然后将每一个分区里面的数据进行处理,处理完了之后再提交每一个分区里面的offset
for (TopicPartition partition : partitions) {
//获取每一个分区里面的数据
List<ConsumerRecord<String, String>> records1 = records.records(partition);
for (ConsumerRecord<String, String> record : records1) {
System.out.println(record.value()+"===="+ record.offset());
}
//获取我们分区里面最后一条数据的offset,表示我们已经消费到了这个offset了
long offset = records1.get(records1.size() - 1).offset();
//提交offset
//提交我们的offset,并且给offset加1 表示我们下次从没有消费的那一条数据开始消费
kafkaConsumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset + 1)));
}
}
}
}
注意事项:
提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。 因此,在调用commitSync(偏移量)时,应该 在最后处理的消息的偏移量中添加一个
消费者代码-使用消费者消费指定分区的数据
1、如果进程正在维护与该分区关联的某种本地状态(如本地磁盘上的键值存储),那么它应该只获取它在磁盘上 维护的分区的记录。
2、如果进程本身具有高可用性,并且如果失败则将重新启动(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作为流处理框架的一部分)。 在这种情况下,Kafka不需要检测故障并重新分配分区,因为消耗过程将在另 一台机器上重新启动。
public class ConsumerSomePartition {
//实现消费一个topic里面某些分区里面的数据
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("group.id", "test_group"); //消费组
props.put("enable.auto.commit", "true");//允许自动提交offset
props.put("auto.commit.interval.ms", "1000");//每隔多久自动提交offset
props.put("session.timeout.ms", "30000");
//指定key,value的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//获取kafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//通过consumer订阅某一个topic,进行消费.会消费topic里面所有分区的数据
// consumer.subscribe();
//通过调用assign方法实现消费mypartition这个topic里面的0号和1号分区里面的数据
TopicPartition topicPartition0 = new TopicPartition("mypartition", 0);
TopicPartition topicPartition1 = new TopicPartition("mypartition", 1);
//订阅我们某个topic里面指定分区的数据进行消费
consumer.assign(Arrays.asList(topicPartition0,topicPartition1));
int i =0;
while(true){
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
i++;
System.out.println("数据值为"+ record.value()+"数据的offset为"+ record.offset());
System.out.println("消费第"+i+"条数据");
}
}
}
}
注意事项:
1、要使用此模式,您只需使用要使用的分区的完整列表调用assign(Collection),而不是使用subscribe订阅 主题。
2、主题与分区订阅只能二选一
消费者数据丢失-数据重复
说明:
1、已经消费的数据对于kafka来说,会将消费组里面的offset值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
2、提交过程:是通过kafka将offset进行移动到下个message所处的offset的位置。
3、拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,那么kafka上的offset值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。
4、什么时候提交offset值?在Consumer将数据处理完成之后,再来进行offset的修改提交。默认情况下offset是 自动提交,需要修改为手动提交offset值。
5、如果在处理代码中正常处理了,但是在提交offset请求的时候,没有连接到kafka或者出现了故障,那么该次修 改offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的offset值再进行处理一 次,那么在hbase中或者mysql中就会产生两条一样的数据,也就是数据重复
Future接口
public interface Future<V>
Future表示异步计算的结果。方法用于检查计算是否完成、等待计算完成以及检索计算的结果。结果只能在计算完成后使用get方法检索,必要时阻塞直到准备好。取消是由取消方法执行的。还提供了其他方法来确定任务是否正常完成或被取消。一旦计算完成,计算不能取消。如果为了可取消性而使用Future,但不提供可用的结果,可以声明Future<?>并返回null作为底层任务的结果。
Future常用方法
boolean cancel(boolean mayInterruptIfRunning)
//试图取消此任务的执行。如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。
//如果成功,且调用cancel时此任务尚未启动,则此任务将永远不会运行。如果任务已经启动,那么
//mayInterruptIfRunning参数确定是否应该中断执行该任务的线程以试图停止该任务。
boolean isCancelled();
//如果该任务在正常完成之前被取消,则返回true。
boolean isDone();
//如果任务完成,返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,该方法将返回true。
V get()
//等待计算完成,然后检索其结果。
V get(long timeout, TimeUnit unit)
//如果需要,最多等待给定的时间来完成计算,然后检索其结果
TimeUnit
public enum TimeUnit
TimeUnit表示给定粒度单元上的持续时间,并提供实用方法来在各个单元之间进行转换,并在这些单元中执行计时和延迟操作。
TimeUnit不维护时间信息,但只帮助组织和使用可能在不同上下文中分别维护的时间表示。纳秒的定义是千分之一微秒,一微秒是千分之一毫秒,一毫秒是千分之一秒,一分钟是60秒,一小时是60分钟,一天是24小时。
NANOSECONDS
MICROSECONDS
MILLISECONDS
SECONDS
MINUTES
HOURS
DAYS
TimeUnit常用方法
public long convert(long sourceDuration, TimeUnit sourceUnit)
//将给定单位中的给定持续时间转换为此单位
//For example, to convert 10 minutes to milliseconds, use:
//TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES)
KafkaProducer
public class KafkaProducer<K, V> implements Producer<K, V>
发布记录到Kafka集群的Kafka客户端。
生产者是线程安全的,在线程之间共享一个生产者实例通常比拥有多个实例要快。
生产者由一个缓冲空间池和一个后台I/O线程组成,后者负责将这些记录转换为请求并将它们传输给集群。如果在使用后没有关闭生产者,将会泄漏这些资源。
KafkaProducer常用方法
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
//异步发送记录到主题,并在确认发送后调用提供的回调。
//发送是异步的,一旦记录被存储在等待发送的记录的缓冲区中,该方法将立即返回。这允许并行地发送许多记录,
//而不必阻塞以等待每个记录之后的响应。
//发送的结果是一个RecordMetadata,它指定了记录被发送到的分区、它被分配的偏移量和记录的时间戳。如果
//主题使用CreateTime,则时间戳将是用户提供的时间戳,或者如果用户没有为记录指定时间戳,则记录发送时间
//如果LogAppendTime用于主题,时间戳将是消息被附加时Kafka代理的本地时间。
public Future<RecordMetadata> send(ProducerRecord<K, V> record)
//相当于send(record, null)
ProducerRecord
public final class ProducerRecord<K, V>
要发送给Kafka的键/值对。这包括要将记录发送到的主题名、一个可选的分区号以及一个可选的键和值。
如果指定了有效的分区号,则在发送记录时将使用该分区。如果没有指定分区,但存在一个键,那么将使用键的散列选择分区。如果键和分区都不存在,那么分区将以循环的方式分配。该记录还具有相关的时间戳。如果用户没有提供时间戳,生产者将用当前时间戳。Kafka最终使用的时间戳取决于为主题配置的时间戳类型。
ProducerRecord构造方法
public ProducerRecord(String topic, Integer partition,
Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
ProducerRecord常用方法
public String topic()
//返回此记录要发送到的主题
public Integer partition()
public Long timestamp()
public K key()
public V value()
KafkaConsumer
public class KafkaConsumer<K, V> implements Consumer<K, V>
消费来自Kafka集群的记录的Kafka客户端。
它将透明地处理Kafka集群中的服务器故障,并透明地适应它获取的数据分区在集群内迁移。该客户机还与服务器进行交互,以允许消费者组使用消费者组对消费进行负载平衡(如下所述)。
消费者维护到必要的brokers的TCP连接以获取数据。如果在使用后没有关闭消费者,这些连接将会泄漏。消费者不是线程安全的
KafkaConsumer构造方法
public KafkaConsumer(Properties properties)
//通过提供一个Properties对象作为配置来实例化消费者。有效的配置记录在ConsumerConfig中
KafkaConsumer常用方法
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
//订阅给定的主题列表以获得动态分配的分区。主题订阅不是增量的。这个列表将替换当前的赋值(如果有的话)。
//注意,不能通过assign(Collection)将主题订阅与组管理以及手动分区分配结合起来。如果给定的主题
//列表为空,则将其视为unsubscribe()。
public void subscribe(Collection<String> topics)
public void unsubscribe()
//取消订阅当前订阅的主题。这还会清除通过assign(Collection)直接分配的任何分区
public ConsumerRecords<K, V> poll(long timeout)
//使用subscribe/assign APIs之一从指定的主题或分区获取数据。
//在轮询数据之前没有订阅任何主题或分区是错误的。
//timeout:
//如果缓冲区中没有可用的数据,在轮询中等待所用的时间(以毫秒为单位)。
//如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。一定不是负的。
public void assign(Collection<TopicPartition> partitions)
//手动将分区列表分配给该用户。该接口不允许增量赋值,将替换之前的赋值(如果有的话)。
public void commitSync()
//提交在最后一次poll()中为所有订阅的主题和分区列表返回的偏移量。
//这将只向Kafka提交偏移量。使用这个API提交的偏移量将在每次重平衡后的第一次取回和启动时
//使用。因此,如果你需要在Kafka以外的任何地方存储偏移量,这个API不应该被使用
ConsumerRecord
public final class ConsumerRecord<K, V>
从Kafka接收的键/值对。这包括一个主题名和一个分区号,一个指向Kafka分区中的记录的偏移量。
ConsumerRecord构造方法
public ConsumerRecord(String topic,
int partition,
long offset,
K key,
V value)
//创建从指定主题和分区接收的记录
public ConsumerRecord(String topic,
int partition,
long offset,
long timestamp,
TimestampType timestampType,
long checksum,
int serializedKeySize,
int serializedValueSize,
K key,
V value)
ConsumerRecord常用方法
public K key()
//返回键
public V value()
//返回值
public long offset()
public long timestamp()
public int partition()
public String topic()
ConsumerRecords
一个容器,用于保存特定主题的每个分区的ConsumerRecord列表。对于Consumer.poll(long)操作返回的每个主题分区,都有一个ConsumerRecord列表。
ConsumerRecords常用方法
public Set<TopicPartition> partitions()
//在该记录集中包含数据的分区集合(如果没有返回数据,则可能为空)
public List<ConsumerRecord<K, V>> records(TopicPartition partition)
//只获取给定分区的记录
ConsumerConfig
消费者配置 keys
GROUP_ID_CONFIG = "group.id"
MAX_POLL_RECORDS_CONFIG = "max.poll.records"
HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"
ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"
...
开发StreamAPI
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class Stream {
/**
* 通过streamAPI实现将数据从test里面读取出来,写入到test2里面去
* @param args
*/
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"bigger");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key的序列化和反序列化的类
properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
//获取核心类 KStreamBuilder
KStreamBuilder kStreamBuilder = new KStreamBuilder();
//通过KStreamBuilder调用stream方法表示从哪个topic当中获取数据
//调用mapValues方法,表示将每一行value都给取出来
//line表示我们取出来的一行行的数据
//将转成大写的数据,写入到test2这个topic里面去
kStreamBuilder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
//通过kStreamBuilder可以用于创建KafkaStream 通过kafkaStream来实现流失的编程启动
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
kafkaStreams.start(); //调用start启动kafka的流 API
}
}
更多推荐
所有评论(0)