近日我们项目组采用 Kafka 来做系统日志统一管理,但是天降横祸的让 Kafka 集群(3台服务器)都挂了,堪比中大奖的节奏,随之而来的是使用 Kafka 发送消息日志的服务全部卡死,经过排查发现居然是 Kafka 当机导致了调用 Kafka 发送日志服务一直处于阻塞状态。

最后我们在检查代码的时候发现,如果无法连接 Kafka 服务,则会出现一分钟的阻塞。以上问题有两种解决方案:

一、开启异步模式 ( @EnableAsync )

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}
public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

 

二、如果使用同步模式,可以通过修改配置参数 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默认 60s ) 来缩短阻塞时间

package com.havent.demo.logger.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.HashMap;
import java.util.Map;

@EnableAsync
@Configuration
@EnableKafka
public class KafkaConfiguration {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String serverAddress;
    
    public Map<String, Object> producerConfigs() {
        System.out.println("HH > serverAddress: " + serverAddress);

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性
        props.put(ProducerConfig.RETRIES_CONFIG, 0);

        // Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);

        /**
         * 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,
         * 可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,
         * 这个设置将增加1毫秒的延迟请求以等待更多的消息。 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是
         * linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
         */
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2000);

        /**
         * 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。
         * 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。
         */
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);

        // 用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒
        // HH 警告:如无法连接 kafka 会导致程序卡住,尽量不要设置等待太久
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100);


        // 消息发送的最长等待时间
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);

        // 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
        // 1:发送消息,并会等待leader 收到确认后,一定的可靠性
        // -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
        props.put(ProducerConfig.ACKS_CONFIG, "0");

        System.out.println(props);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }
    
}

 

谨以此献给那些被 Spring Kafka 同步模式坑害又苦无出路的同胞。。。

 

转载于:https://my.oschina.net/u/943746/blog/1928471

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐