Kafka(四)消费者消费JSON消息+使用统一反序列化器+提升吞吐量
在生产者端,我们发送自定义的对象时,利用自定义序列化类将其序列化为JSON。在消费者端,我们同样需要自定义反序列类将JSON转为我们之前的对象@Override在处理消费者相关逻辑时,我们重点关系如何确保消息不重复消费以及如何增加消费者的吞吐量消费逻辑尽可能保证处理速度快,尽量减少耗时的逻辑。
文章目录
在 上一篇文章里,对于生产者,发送时失败之后会由定时任务进行重新发送, 并且我们是根据消息的key进行分区的, 所以不管我们重新发送了多少次,对于同一个key,始终会被送到同一个分区。
那么到消费者这里,最重要的问题是如何确保不会重复消费之前因为各种原因被重新发送到某个分区的消息。
如何确保不重复消费消息?
基本思路如下
- 我们在数据库中创建了一个已成功消费的消息表,里面只有一列,消息的key。当消费者消费逻辑成功之后,我们会把其key保存到这张表里 。
- 当消费者拉取新的一批消息时,我们会去数据库的消息表里查是否已经存在该消息的key,存在的话,就跳过实际的消费业务。
- 一批消息里也可能存在相同的key,所以我们处理完一次消费业务,就把该key放到一个set里,消费下一条消息时,则先去set里看一下,存在的话即跳过,不存在则正常执行消费业务。即使前面的消息消费业务失败了,后面相同key的消息也直接跳过,不会再次消费
消费者业务逻辑重试
对于消费者业务逻辑的重试,我们使用failsafe框架进行重试,该框架的使用可参考官方文档,这里不做过多赘述。
消费者提交
这里的方式采用的是Kafka权威指南中消费者一章中提出的方式。 异步+同步。平时使用异步提交,在关闭消费者时,使用同步提交,确保消费者退出之前将当前的offset提交上去。
使用统一反序列化器
在上一篇文章中,我们引入了kafka-json-serializer来帮我们统一序列化JSON,这里同样使用该依赖,帮我们统一反序列化,这样我们也不用对每一个需要反序列化的POJO都去实现Deserializer接口。
具体配置见下面详细代码的这两行
result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class.getName());
result.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, valueType);
和序列化不同的是,反序列化我们需要知道目标POJO是哪个,所以这里第二个参数传的就是POJO.class.getName();
Partition 分区数到底应该设置多少合适?
在Kafka权威指南 > 第二章 Installing Kafka > Broker Configuration > Topic Defaults 中讨论了如何确定合适的分区数。
So if I want to be able to write and read 1 GB/sec from a topic, and I know each consumer can only process 50 MB/s, then I know I need at least 20 partitions. This way, I can have 20 consumers reading
from the topic and achieve 1 GB/sec.
if you don’t have this detailed information, our experience suggests that limiting the size of the partition on the disk to less than 6 GB per day of retention often gives satisfactory results
如果上面这些信息你都无法确定,我的实际经验是,通过不断地写入数据,直接对Kafak+Consumer代码进行压测,然后观察其消费延迟是多少,低了就增加分区数量,直到你觉得这个延迟是可以接受的。
一开始我司的某个topic的分区数量是3,很少,这种配置下,观察到消费者代码消息很慢。经过压测,将其调整至20个分区,对我们来说是一个可以接受的值
消费者参数配置及其说明
/**
* 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景需求 自己调整
* https://kafka.apache.org/26/documentation/#group.instance.id
*
* 为什么需要group.instance.id?
* 假设auto.offset.reset=latest
* 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会【丢失这部分消息】
* 假如auto.offset.reset=earliest
* 1. 如果没有group.instance.id,那么kafka会认为此消费者是dynamic member,在重启期间如果有消息发送到topic,那么重启之后,消费者会重复消费【全部消息】
*
* 光有group.instance.id还不够,还需要修改heartbeat.interval.ms和session.timeout.ms的值为合理的值
* 如果程序部署,重启期间,重启时间超过了session.timeout.ms的值,那么kafka会认为此消费者已经挂了会触发rebalance,在一些大型消息场景,rebalance的过程可能会很慢, 更详细的解释请参考
* https://kafka.apache.org/26/documentation/#static_membership
* @param groupInstanceId
*
* 建议设置client.id, 理由和{@link #loadProducerConfig()} 注释中的原因一样
* Consumer生成client.id的逻辑见 {@link ConsumerConfig#maybeOverrideClientId(Map)}
*/
public static Properties loadConsumerConfig(int groupInstanceId, String valueType) {
Properties result = new Properties();
result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9093");
rresult.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class.getName());
result.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, valueType);
result.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 代表此消费者是消费者组的static member
result.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "test-" + ++groupInstanceId);
// 建议设置client.id
result.put(ConsumerConfig.CLIENT_ID_CONFIG, SERVER_ID + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement());
// 修改heartbeat.interval.ms和session.timeout.ms的值,和group.instance.id配合使用,避免重启或重启时间过长的时候,触发rebalance
result.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 * 60);
result.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000 * 60 * 5);
// 关闭自动提交
result.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE);
// 默认1MB,增加吞吐量,其设置对应的是每个分区,也就是说一个分区返回10MB的数据
result.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576 * 10);
result.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 返回全部数据的大小
result.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1048576 * 100);
// 默认5分钟
result.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000 * 60 * 5);
return result;
}
重要的参数session.time.ms和heartbeat.interval.ms和group.instance.id
三者的使用方式见上面代码中的注释。
增加消费者的吞吐量
和上一篇文章一样,由于我们的邮件消息每个大概是20KB,使用默认的消费者参数,吞吐量是上不来的。 所以做了一些优化,除了消费者消费逻辑要尽可能简单之外,为了增加消费者的吞吐量,可以根据实际场景修改倒数第4、3、2个参数。
消费者消费的超时时间和poll()方法的关系
由max.poll.interval.ms参数控制,默认5分钟。如果消费者业务逻辑处理特别耗时,在5分钟之内没有再次调用poll()拉取消息,则Kafka认为消费者已死,根据具体配置会立刻触发rebalance还是等一段时间再触发rebalance。
这里特别强调一下,网上有一部分文章说是要确保消费逻辑在poll(timeUnit)时间内处理完,否则就会触发rebalance。这都是很早之前的Kafka版本了,是因为原来消费者的poll()线程和心跳线程使用的是同一个线程。现在的版本早就把这两个分开了。所以你只需要注意,自己的消费逻辑别超过max.poll.interval.ms即可,如果觉得不够用,也可自己调整。
poll()方法中的时间代表的是多长时间去拉取一次消息。假设你设置的是1分钟,你的消费逻辑处理的很快,可能用了10s。那么在你消费完了之后,消费者会在1分钟之后拉取新消息。
在消费者中使用手动提交。
消费者消费逻辑
这里要注意
- 如果消费逻辑可能抛出异常,则使用try-catch处理,防止因为抛出异常,导致我们错误的关闭了消费者
- 消费者消费逻辑失败时会重试,重试N次之后,我们会将其保存在数据库中,以便和生产者一样,定时处理失败的消息
- 消费逻辑没问题的话,则把该消息的key进行入库处理
@Log
public class MessageConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private MessageAckConsumesSuccessService messageAckConsumesSuccessService = new MessageAckConsumesSuccessService();
private MessageFailedService messageFailedService = new MessageFailedService();
private final KafkaConsumer<String, UserDTO> consumer;
private final int consumerPollIntervalSecond;
public MessageConsumerRunner(KafkaConsumer<String, UserDTO> consumer, int consumerPollIntervalSecond) {
this.consumer = consumer;
this.consumerPollIntervalSecond = consumerPollIntervalSecond;
}
/**
* 1. 使用https://failsafe.dev/进行重试
* 2. 每次消费消息前,判断消息ID是否存在于数据库中和当前Set集合中,避免重复消费,
* 我们的消息时根据消息的key进行hash分区的,所以同一个消息即使生产多次,一定会到同一个partition中,partition动态增加引起的特殊情况不在考虑范围之内
* 4. 在一次消费消息中重试两次,如果两次都失败,那么将失败原因、消息的JSON字符串插入到message_failed表中,以便后续再次生产或排查问题
* 3. 平时异步提交,关闭消费者时使用同步提交
*/
@Override
public void run() {
AtomicReference<String> errorMessage = new AtomicReference<>(StringUtils.EMPTY);
RetryPolicy<Boolean> retryPolicy = RetryPolicy.<Boolean>builder()
.handle(Exception.class)
// 如果业务逻辑返回false或者抛出异常,则重试
.handleResultIf(Boolean.FALSE::equals)
// 不包含首次
.withMaxRetries(2)
.withDelay(Duration.ofMillis(200))
.onRetry(e -> log.warning("consume message failed, start the {}th retry"+ e.getAttemptCount()))
.onRetriesExceeded(e -> {
Optional.ofNullable(e.getException()).ifPresent(u -> errorMessage.set(u.getMessage()));
log.severe("max retries exceeded" + e.getException());
})
.build();
Fallback<Boolean> fallback = Fallback.<Boolean>builder(e -> {
// do nothing, suppress exceptions
}).build();
try {
consumer.subscribe(Collections.singletonList("email"));
while (!closed.get()) {
// get message from kafka
ConsumerRecords<String, UserDTO> records = consumer.poll(Duration.ofSeconds(consumerPollIntervalSecond));
if (records.isEmpty()) {
return;
}
Set<UserDTO> successConsumed = new HashSet<>();
Set<UserDTO> failedConsumed = new HashSet<>();
Map<String, String> failedConsumedReason = new HashMap<>();
// check message if exist in database
Set<String> checkingMessageIds = new HashSet<>(records.count());
records.iterator().forEachRemaining(item -> checkingMessageIds.add(item.value().getMessageId()));
Set<String> hasBeenConsumedMessageIds = messageAckConsumesSuccessService.checkMessageIfExistInDatabase(checkingMessageIds);
records.forEach(item -> {
if (hasBeenConsumedMessageIds.contains(item.value().getMessageId())) {
// if exist, continue
return;
}
// 每一批消息中也可能存在同样的消息,所以需要再次判断
hasBeenConsumedMessageIds.add(item.value().getMessageId());
try {
Failsafe.with(fallback, retryPolicy)
.onSuccess(e -> successConsumed.add(item.value()))
.onFailure(e -> {
failedConsumed.add(item.value());
failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(errorMessage.get()) ? errorMessage.get() : "no reason, may be check server log");
errorMessage.set(StringUtils.EMPTY);
})
.get(() -> {
// 这里是业务逻辑,可以返回true或false,为什么要这样?是因为上面RetryPolicy这里定义的boolean,根据自己实际业务设置相应的类型
return true;
});
// 这里要catch住所有业务异常,防止由业务异常导致消费者线程退出
}catch (Exception e) {
log.severe("failed to consume email message" + e);
failedConsumed.add(item.value());
failedConsumedReason.put(item.value().getMessageId(), StringUtils.isNotBlank(e.getMessage()) ? e.getMessage() : e.getCause().toString());
}
});
postConsumed(successConsumed, failedConsumed, failedConsumedReason);
// 平时使用异步提交
consumer.commitAsync();
}
}catch (WakeupException e) {
if (!closed.get()) {
throw e;
}
} finally {
// 消费者退出时使用同步提交
try {
consumer.commitSync();
} catch (Exception e) {
log.info("commit sync occur exception: " + e);
} finally{
try {
consumer.close();
}catch (Exception e) {
log.info("consumer close occur exception: " + e);
}
log.info( "shutdown kafka consumer complete");
}
}
}
/**
* 处理成功、成功后的回调、失败
* @param successConsumed
* @param failedConsumed
* @param failedConsumedReason
*/
private void postConsumed(Set<UserDTO> successConsumed, Set<UserDTO> failedConsumed, Map<String, String> failedConsumedReason) {
// 后置处理开启异步线程处理,不阻塞消费者线程
// 克隆传进来的集合,而不使用原集合的引用,因为原集合每次消费都会重置
Set<UserDTO> cloneSuccessConsumed = new HashSet<>(successConsumed);
Set<UserDTO> cloneFailedConsumed = new HashSet<>(failedConsumed);
Map<String, String> cloneFailedConsumedReason = new HashMap<>(failedConsumedReason);
new Thread( () -> {
if (!cloneSuccessConsumed.isEmpty()) {
messageAckConsumesSuccessService.insertMessageIds(cloneSuccessConsumed.stream().map(UserDTO::getMessageId).collect(Collectors.toSet()));
cloneFailedConsumed.forEach(item -> {
if (Objects.nonNull(item.getCallbackMetaData())) {
// do callback
CallbackProducer callbackProducer = new CallbackProducer();
callbackProducer.sendCallbackMessage(item.getCallbackMetaData(), MessageFailedPhrase.PRODUCER);
}
});
}
if (!cloneFailedConsumed.isEmpty()) {
ObjectMapper objectMapper = new ObjectMapper();
cloneFailedConsumed.forEach(item -> {
MessageFailedEntity entity = new MessageFailedEntity();
entity.setMessageId(item.getMessageId());
entity.setMessageType(MessageType.EMAIL);
entity.setMessageFailedPhrase(MessageFailedPhrase.CONSUMER);
entity.setFailedReason(cloneFailedConsumedReason.get(item.getMessageId()));
try {
entity.setMessageContentJsonFormat(objectMapper.writeValueAsString(item));
} catch (JsonProcessingException e) {
log.info("failed to convert UserDTO message to json string");
}
messageFailedService.saveOrUpdateMessageFailed(entity);
});
}
}).start();
}
public void shutdown() {
log.info( Thread.currentThread().getName() + " shutdown kafka consumer");
closed.set(true);
consumer.wakeup();
}
}
启动消费者
通过实现ServletContextListener接口对于方法使其在Tomcat启动之后,启动消费者
public class StartUpConsumerListener implements ServletContextListener {
/**
* 假设开启10个消费者.
*
* 消费者的数量要和partition的数量一致,实际情况下,可以调用AdminClient的方法获取到topic的partition数量,然后根据partition数量来创建消费者.
* @param sce
*/
@Override
public void contextInitialized(final ServletContextEvent sce) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new AbortPolicy());
for (int i = 0; i < 10; i++) {
KafkaConsumer<String, UserDTO> consumer = new KafkaConsumer<>(KafkaConfiguration.loadConsumerConfig(i, UserDTO.class.getName()));
MessageConsumerRunner messageConsumerRunner = new MessageConsumerRunner(consumer, 10);
// 使用另外一个线程来关闭消费者
Thread shutdownHooks = new Thread(messageConsumerRunner::shutdown);
KafkaListener.KAFKA_CONSUMERS.add(shutdownHooks);
// 启动消费者线程
threadPoolExecutor.execute(messageConsumerRunner);
}
}
}
关闭消费者
public class KafkaListener implements ServletContextListener {
public static final Vector<Thread> KAFKA_CONSUMERS = new Vector<>();
@Override
public void contextInitialized(ServletContextEvent sce) {
// do noting
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
KAFKA_CONSUMERS.forEach(Thread::run);
}
}
配置listener
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
version="6.0">
<display-name>Kafka消息的消费者-消息系统</display-name>
<!-- listener的contextInitialized顺序按照声明顺序执行, contextDestroyed方法按照声明顺序反向执行-->
<listener>
<listener-class>com.message.server.listener.KafkaListener</listener-class>
</listener>
<listener>
<listener-class>com.message.server.listener.StartUpConsumerListener</listener-class>
</listener>
</web-app>
使用模版方法设计模式支持消费不同优先级的邮件
在实际应用过程中,邮件也有不同的优先级。不同的优先级对应不同的topic、partition、group以及poll的频率。
我们初期阶段支持3种级别的邮件,分别是
- High Priority,重要业务的邮件,例如:用户重置密码的确认邮件;订单确认邮件。此类场景要求更低的延迟、更高的吞吐量
- Medium Priority,介于1和3之间的业务
- Low Priority,例如:各种定时报表邮件。此类场景对延迟要求较低,只要能发出即可
但3者的整体步骤是一样的,即
- 拉取新消息
- 去重
- 发送邮件
- 失败重试
- 入库消息ID或失败记录
- 提交offset
所以可以抽取出一个abstract的父类,基本结构如下
public abstract class PriorityConsumer {
private static final Logger LOGGER = LogManager.getLogger();
abstract String getGroupId();
abstract String getTopic();
abstract int getPollIntervalSecond();
/**
* <p>Q: 为什么不将该方法添加final关键字以避免子类进行重写?
* <p>A: 因为这会导致子类注入到CDI容器中失败
*/
public void consumeMessage() {
// 具体的实现逻辑
}
}
子类实现如下
@ApplicationScoped
public class LowPriorityConsumer extends PriorityConsumer {
@PostConstruct
public void consume() {
consumeMessage();
}
@Override
String getGroupId() {
return LOW_PRIORITY_EMAIL_GROUP_ID;
}
@Override
String getTopic() {
return LOW_PRIORITY_EMAIL_TOPIC;
}
@Override
int getPollIntervalSecond() {
return LOW_PRIORITY_CONSUMER_INTERVAL_SECOND;
}
}
程序启动之后,只需要实例化LowPriorityConsumer即可
结语
- 在处理消费者相关逻辑时,我们重点关心如何确保消息不重复消费以及如何增加消费者的吞吐量
- 消费逻辑尽可能保证处理速度快,尽量减少耗时的逻辑
示例源码仓库
- Github地址
- 项目下message-server module代表生产者
- 运行时IDEA配置如下
我们生产者和消费者的正常情况都以处理完了,下一篇文章我们将重点处理生产者失败和消费者失败之后重新生产消息和消费消息的逻辑,以及说一下Kafka中的rebalance。
更多推荐
所有评论(0)