Kafka发送消息和消费消息的方式
一.发送消息 Kafka向 Broker 发送消息的方式,可以分为三种,分别是 Fire-and-forget、Synchronous send、Asynchronous send。示例代码:public class ProducerSendServiceTest {/*** 基本属性*/private static KafkaProducer&...
一.发送消息
Kafka向 Broker 发送消息的方式,可以分为三种,分别是 Fire-and-forget、Synchronous send、Asynchronous send。
示例代码:
public class ProducerSendServiceTest {
/**
* 基本属性
*/
private static KafkaProducer<String, String> producer;
/**
* 主题
*/
private static String TOPIC = "topic-send-test";
static {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
}
// 发送消息的不同方式
}
1.发送了不管(Fire-and-forget)
这种方式是不管发送成功与否,客户端都会返回成功。尽管大多数的时候 Kafka 在发送失败后,会自己重新自动再一次发送消息,但是也会存在丢失消息的风险。
/**
* 第一种方式:直接发送,不管结果
*/
private static void fireAndForget() {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Msg-Fire-Forget",
"Welcome to my home!!! ");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
2.同步方式(Synchronous send)
这种方式是同步发送的方式,会等待 future 对象的返回来判断是否发送成功。
/**
* 第二种方式:同步方式
*/
private static void synchronousSend() {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Msg-Sync",
"Sync Message");
try {
// 获取 Broker 的返回信息
RecordMetadata rec = producer.send(record).get();
System.out.println(rec.topic());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
3.异步回调(Asynchronous send)
异步发送基于实现了send() 方法的回调函数。
/**
* 第三种方式:异步回调方式
*/
private static void asynchronousSend() {
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Msg-Async",
"Async Message");
try {
System.out.println("Staring Sending....");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("Got FeedBack....");
}
});
System.out.println("Stop Sending....");
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
或者使用单独的类去实现接口:org.apache.kafka.clients.producer.Callback
private static class MyProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
return;
}
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
System.out.println("Coming in MyProducerCallback");
}
}
二.消费消息
KafkaConsumer(消费者)每次调用 poll() 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录, 我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。Kafka 不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的一个独特之处。相反,消费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。
我们把更新分区当前位置的操作叫作提交。
那么消费者是如何提交偏移量的呢?
消费者往一个 叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发
再
均
衡
\color{#FF3030}{再均衡}
再均衡,再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复处理
,如下图所示。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被丢失
,如下图所示。
所以,处理偏移量的方式对客户端会有很大的影响。 KafkaConsumer API 提供了很多种方式来提交偏移量。
代码示例:
public class KafkaConsumerTest {
private static Properties properties = new Properties();
static {
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("group.id", "test");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
// 提交偏移量的方式
}
1.自动提交偏移量
最简单的方式是让消费者自动提交偏移量。如果 enable.auto.commit被设为true
,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms
控制,默认值是5s。自动提交也是在轮询里进行的,消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用close() 方法之前也会进行自动提交)。在一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心 。
示例:假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 2s 发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 2s,所以在这 2s 内到达的消息会被重复处理
。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。
代码示例:
/**
* 自动提交偏移量
*/
public static void commitAuto() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题,可传入一个主题列表,也可以是正则表达式,如果有人创建了与正则表达式匹配的新主题,
// 会立即触发一次再均衡,消费者就可以读取新添加的主题。
// 如:test.*,订阅test相关的所有主题
consumer.subscribe(Collections.singleton("test_partition"));
try {
while (true) {
// 消费者持续对kafka进行轮训,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。
// 传给poll方法的是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)
// 如果该参数被设为0,poll会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据
// poll方法返回一个记录列表。每条记录包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 退出应用前使用close方法关闭消费者。
// 网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不在发送心跳并认定它已死亡,
// 因为那样需要更长的时间,导致政哥群组在一段时间内无法读取消息。
consumer.close();
}
}
2.手动同步提交当前偏移量
大部分开发者通过控制偏移量提交时间来减小丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API提供了另一种提交偏移量的方式, 开发者可以在必要的时候提交当前偏移盘,而不是基于时间间隔。
把
e
n
a
b
l
e
.
a
u
t
o
.
c
o
m
m
i
t
设
为
f
a
l
s
e
\color{#FF3030}{把enable.auto.commit设为false}
把enable.auto.commit设为false,让应用程序决定何时提交偏移量。使用
c
o
m
m
i
t
S
y
n
c
(
)
\color{#FF3030}{commitSync()}
commitSync()提交偏移量最简单可靠。这个API会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。在处理完所有记录后要确保调用了commitSync(),
否
则
还
是
会
有
丢
失
消
息
的
风
险
\color{#FF3030}{否则还是会有丢失消息的风险}
否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。
代码示例:
/**
* 手动同步提交当前偏移量
*/
public static void commitSelfSync() {
// 关闭自动提交偏移量,改用手动提交,与下方consumer.commitSync();一起使用
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test_partition"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
consumer.commitSync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
3.手动异步提交当前偏移量
同步提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过
降
低
提
交
频
率
(
如
:
批
量
提
交
)
\color{#FF3030}{降低提交频率(如:批量提交)}
降低提交频率(如:批量提交)来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。
这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker 的响应。
异步的缺点
在成功提交或碰到无怯恢复的错误之前, commitSync() 会一直重试(应用程序也一直阻塞),但是 commitAsync() 不会,这也是 commitAsync() 一个不好的地方。它之所以不进行重试,是因为在它收到 服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果
发
生
再
均
衡
\color{#FF3030}{发生再均衡}
发生再均衡,就会出现重复消息。
我们之所以提到这个问题的复杂性和提交顺序的重要性,是因为 commitAsync() 也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标, 不过
如
果
你
要
用
它
来
进
行
重
试
,
一
定
要
注
意
提
交
的
顺
序
\color{#FF3030}{如果你要用它来进行重试, 一定要注意提交的顺序}
如果你要用它来进行重试,一定要注意提交的顺序。
异步提交重试示例:
我们可以使用一个【单调递增的序列号】来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。
代码示例:
public static void commitSelfAsync() {
//关闭自动提交偏移量,改用手动提交,与下方consumer.commitAsync();一起使用
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test_partition"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
// 异步提交:提交最后一个偏移量,然后继续做其他事情
// consumer.commitAsync();
// 发送提交请求,然后继续做其他事情,如果提交失败,错误信息和偏移量会被记录下来。可考虑重试。
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null != exception) {
System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, exception.getMessage()));
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
4.同步和异步组合提交偏移量
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的,所以可以使用 commitAsync() 方法来提交,这样速度更快;但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功,就需要使用 commitSync() ;来提交偏移量。
如果直接关闭消费者,就没有所谓的下一次提交
了,使用 commitSync() 方法会一直重试,直到提交成功或者发生无法恢复的错误。即:组合使用 commitAsync() 和 commitSync()。
示例代码:
public static void commitSelfSyncAndAsync() {
// 关闭自动提交偏移量,改用手动提交
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test_partition"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
//异步提交(结合下方同步提交)
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//同步提交
consumer.commitSync();
consumer.close();
}
}
5.提交特定的偏移量
上述几种方式的提交偏移量的频率与处理消息批次的频率是一样的,如果poll()方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,可以在调用commitAsync()和commitSync()方法时传进去希望提交的分区和偏移量的map。
上述几种方式的提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交出怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办
?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量
,而此时该批次里的消息还没有处理完。
幸运的是,消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。假设你处理了半个批次的消息, 最后一个来自主题“customers” 分区 3 的消息的偏移量是 5000, 你可以调用 commitSync() 方法来提交它。不过,因为消费者可能不只读取一个分区, 你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。
代码示例:
/**
* 提交特定的偏移量
*/
public static void commitSelfAppoint() {
// 用于跟踪偏移量的 map
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
//关闭自动提交偏移量,改用手动提交
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("test_partition"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 消息的临时处理方案
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
// 在读取每条记录之后,使用期望处理的下一个消息的偏移量更新map里的偏移量,下一次就从这里开始读取消息
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
// 每处理 1000 条记录就提交一次偏移量。在实际运用中,可以根据时间或记录的内容进行提交
if (count % 1000 == 0) {
// 每1000次提交一次,还可以根据时间间隔来提交
// 这里调用的是的commitAsync(),也可以调用commitSync()。当然,在提交特定偏移量的时候,仍然要处理可能发生的错误
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null != exception) {
System.out.println(String.format("==== Commit failed for offsets %s, error:%s ====", offsets, exception.getMessage()));
}
}
});
}
count++;
}
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
三.扩展
1.再均衡监听器
在前面的提交偏移量中提到过,消费者在退出和进行分区再均衡之前,会做一些清理工作。
你会在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前, 需要处理在缓冲区累积下来的记录。你可能还需要关闭文件句柄、数据库连接等。
在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalancelistener 实例就可以了。 ConsumerRebalancelistener 有两个需要实现的方法。
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
方法会在再均衡开始之前
和消费者停止读取消息之后
被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。public void onPartitionsAssigned(Collection<TopicPartition> partitions)
方法会在重新分配分区之后
和消费者开始读取消息之前
被调用。
示例:如何在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量?
private Map<TopicPartition, OffsetAndMetadata> cueentOffsets = new HashMap<>();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 1.首先实现 ConsumerRebalanceListener 接口
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。要注意,这里提交的是最近
// 处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们在处理
// 消息的时候被再均衡。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区
// 的偏移量 -- 因为提交的偏移量是已经处理过的,所以不会有什么问题。调用commitSync()
// 方法,确保在再均衡发生之前提交偏移量。
System.out.println("Lost partitions in rebalance." +
"Committing current offsets:" + cueentOffsets);
consumer.commitSync(cueentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 2.在获得新分区后开始读取消息,不需要做其他事情
}
}
public void consumerRebalanceListenertest() {
try {
// 4.把 ConsumerRebalanceListener 对象传给subscribe() 方法
consumer.subscribe(Collections.singleton("test_partition"), new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
cueentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
}
consumer.commitSync(cueentOffsets, null);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
consumer.commitSync(cueentOffsets);
} finally {
consumer.close();
}
}
}
2.从特定偏移量处开始处理记录
到目前为止,我们知道了如何使用 poll() 方法从各个分区的最新偏移量处开始处理消息
。 不过,有时候我们也需要从特定的偏移量处开始读取消息。
如果你想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息, 可以使用 seekToBeginning(Collection<TopicPartition> tp)
和 seekToEnd(Collection<TopicPartition> tp)
这两个方法。
不过, Kafka也为我们提供了用于查找特定偏移量的 API
。 它有很多用途,比如:向后回退几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能够向前跳过若干个消息)。在使用 Kafka 以外的系统来存储偏移量时,它将给我们带来更大的便利。
假设有这样的运用场景:应用程序从 Kafka 读取事件(可能是网站的用户点击事件流 ),对它们进行处理(可能是使用自动程序清理点击操作并添加会话信息),然后把结果保存到数据库、 NoSQL 存储引擎或 Hadoop。假设我们真的不想丢失任何数据,也不想在数据库里多次保存相同的结果(消费端角度的精确一次消费)
。
这种情况下,消费者的示例代码如下 :
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
cueentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata"));
}
// 1、保存结果记录到DB
sotoreRecordDb(record);
// 2、提交偏移量
consumer.commitSync(cueentOffsets);
}
在这个例子里,每处理一条记录就提交一次偏移量。尽管如此, 在记录被保存到数据库之后以及偏移量被提交之前,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里就会出现重复记录。
如果保存记录和偏移量可以在一个原子操作里完成
,就可以避免出现上述情况。记录和偏移量要么都被成功提交,要么都不提交。如果记录是保存在数据库里而偏移量是提交到 Kafka 上,那么就无法实现原子操作。
不过 ,如果在同一个事务里把记录和偏移量都写到数据库里会怎样呢
?那么我们就会知道 记录和偏移量要么都成功提交,要么都没有,然后重新处理记录。
现在的问题是:如果偏移量是保存在数据库里而不是 Kafka 里,那么消费者在得到新分区时怎么知道该从哪里开始读取? 这个时候可以使用 seek() 方法
。在消费者启动或分配到新分区时,可以使用 seek() 方法查找保存在数据库里的偏移量。
下面的例子大致说明了如何使用这个 API。 使用 ConsumerRebalancelistener
和 seek()
方法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。
ConsumerRebalanceListener 监听器代码示例:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 1、通过该方法提交数据库事务。大致过程为:在处理完记录之后,将记录和偏移量
// 插入数据库,然后在即将失去分区所有权之前(onPartitionsRevoked)提交事务,
// 确保成功保存了这些信息。
commitTransactionToDb();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
// 2、使用该方法从数据库获取偏移量,在分配到新分区的时候,
// 使用seek()方法定位到对应的偏移量位置。
consumer.seek(partition, getOffsetFromDb(partition));
}
}
}
消费者使用代码示例:
public void processingRecordsTest() {
try {
consumer.subscribe(Collections.singleton("test_partition"), new SaveOffsetsOnRebalance());
consumer.poll(Duration.ofMillis(0));
// 3、订阅主题之后,开始启动消费者,我们调用一次 poll() 方法,让消费者加入到消费者群组里,并获取分配到的分区,
// 然后马上调用 seek() 方法定位分区的偏移量。要注意,seek() 方法只更新我们正在使用的位置,在下一次调用 poll()
// 时就可以获得正确的消息。如果 seek() 发生错误(比如偏移量不存在), poll()就会抛出异常。
for (TopicPartition partition : consumer.assignment()) {
consumer.seek(partition, getOffsetFromDb(partition));
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processingRecords(record);
storeRecordDb(record);
// 4、该方法主要更新的是数据库里用于保存偏移量的表。假设更新记录的速度非常快,
// 那么每条记录都需要更新一次数据库,会导致提交的速度比较慢。所以只在每个批次末尾提交一次,
// 这里也可以通过别的方式来优化。
storeOffsetDb(record.topic(), record.partition(), record.offset());
}
commitTransactionToDb();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
} finally {
consumer.close();
}
}
}
通过把偏移量和记录保存到同 一个外部系统来实现单次语义可以有很多种方式,不过它们都需要结合使用 ConsumerRebalancelistener 和 seek() 方法来确保能够及时保存偏移量,并保证消费者总是能够从正确的位置开始读取消息。
3.如何退出
之前讨论轮询时就说过,不需要担心消费者会在一个无限循环里轮询消息,我们会告诉消费者如何优雅地退出循环。
如果确定要退出循环,需要通过另一个线程调用 consumer.wakeup()
方法。如果循环运行在主线程里,可以在 ShutdownHook
里调用该方法。要记住, consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法
。调用 consumer.wakeup() 可以退出 poll(), 并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时没有线程等待轮询, 那么异常将会在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。不过, 在退出线程之前调用 consumer.close()
是很有必要的,它会提交任何还没有提交的东西, 并向群组协调器(broker)发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。
示例:下面是运行在主线程上的消费者退出线程的代码
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
// 1、ShutdownHook运行在单独的线程里,所以退出循环最安全的方式
// 只能是调用 wakeup() 方法。
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者使用代码示例:
public void processingRecordsTest() {
try {
consumer.subscribe(Collections.singleton("test_partition"), new SaveOffsetsOnRebalance());
consumer.poll(Duration.ofMillis(0));
// 这里是死循环,直到按下Ctrl + C 键,关闭的钩子会在退出时进行清理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
System.out.println(" --- shutdown --- " + System.currentTimeMillis());
for (ConsumerRecord<String, String> record : records) {
System.out.println("offset:" + record.offset() + ",key:" + record.key() +
",value:" + record.value());
}
for (TopicPartition partition: consumer.assignment()){
System.out.println("commit offset partition:" + consumer.position(partition));
}
consumer.commitSync();
}
} catch (Exception e) {
// 2、在另一个线程里调用 wakeup() 方法,导致 poll() 抛出 WakeupException。
// 你可以捕获异常以确保应用不会意外终止,但实际上这不是必需的。
e.printStackTrace();
} finally {
try {
} finally {
// 3、在退出之前,确保彻底关闭了消费者。
consumer.close();
System.out.println("consumer close...");
}
}
}
更多推荐
所有评论(0)