近日我们项目组采用 Kafka 来做系统日志统一管理,但是天降横祸的让 Kafka 集群(3台服务器)都挂了,堪比中大奖的节奏,随之而来的是使用 Kafka 发送消息日志的服务全部卡死,经过排查发现居然是 Kafka 当机导致了调用 Kafka 发送日志服务一直处于阻塞状态。
最后我们在检查代码的时候发现,如果无法连接 Kafka 服务,则会出现一分钟的阻塞。以上问题有两种解决方案:
一、开启异步模式 ( @EnableAsync )
@EnableAsync
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${kafka.brokers}")
private String servers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
}
@Bean
public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
}
@Bean
public Producer producer() {
return new Producer();
}
}
public class Producer {
public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate<String, GenericMessage> kafkaTemplate;
@Async
public void send(String topic, GenericMessage message) {
ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {
@Override
public void onSuccess(final SendResult<String, GenericMessage> message) {
LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
}
@Override
public void onFailure(final Throwable throwable) {
LOGGER.error("unable to send message= " + message, throwable);
}
});
}
}
二、如果使用同步模式,可以通过修改配置参数 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默认 60s ) 来缩短阻塞时间
package com.havent.demo.logger.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.HashMap;
import java.util.Map;
@EnableAsync
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String serverAddress;
public Map<String, Object> producerConfigs() {
System.out.println("HH > serverAddress: " + serverAddress);
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性
props.put(ProducerConfig.RETRIES_CONFIG, 0);
// Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
/**
* 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,
* 可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,
* 这个设置将增加1毫秒的延迟请求以等待更多的消息。 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是
* linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 2000);
/**
* 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。
* 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒
// HH 警告:如无法连接 kafka 会导致程序卡住,尽量不要设置等待太久
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100);
// 消息发送的最长等待时间
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);
// 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
// 1:发送消息,并会等待leader 收到确认后,一定的可靠性
// -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
props.put(ProducerConfig.ACKS_CONFIG, "0");
System.out.println(props);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
谨以此献给那些被 Spring Kafka 同步模式坑害又苦无出路的同胞。。。
所有评论(0)