由于新公司是做物联网的,公司刚起步,没什么项目,就是在做一些基础的服务的搭建,现在微服务这么火,可想而知,Spring Boot ,Spring Cloud 是必须要会的技能,而做物联网,把各种智能设备的数据采集上来,也避免不了要用到消息系统。所以我们的架构师从众多消息中间件中选出了kafka和mqtt。mqtt在物联网中的作用不言而去,我这边也在学,从mosquitto服务端搭建,到通过mqtt的java客户端paho 实现消息的生产和消费,不过用paho实现太过复杂,还是用springboot为我们提供的实现方式简便。no no,偏离主题了,今天主要说的是kafka。

    首先呢,kafka集群和zookeeper的集群的搭建我这就不赘述了,网上有很多,其实还是考研自己的linux水平,linux熟的 这些服务搭建 就是小菜一碟。

    kafka消费者有一个熟悉groupId 就是一个topic中的消息只能被同一个groupId的消费者中的一个消费者消费。

这个groupId,在配置消费者时指定。

但是问题来了,怎么实现让一个topic可以让不同group消费呢。

这个为也不会,哈哈

所以上网查了,看了一个说的特别好。

意思就是

goupid不要用配置文件配置的方式

细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

动手吧。

新建一个KafkaConsumerConfig类,代码如下,指定了两个容器,也就两个group

分别为kafkaListenerContainerFactory1和kafkaListenerContainerFactory2

import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
 
@Configuration
public class KafkaConsumerConfig {
 
    private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";
 
    private String group1 = "test1";
    private String group2 = "test2";
 
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory1());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory2());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }
    
    public Map<String, Object> getCommonPropertis() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    }
    
 
    public ConsumerFactory<String, String> consumerFactory1() {
        Map<String, Object> properties = getCommonPropertis();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
    
    public ConsumerFactory<String, String> consumerFactory2() {
    	 Map<String, Object> properties = getCommonPropertis();
         properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);
         return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
}

上面代码中,其实,很多配置项,你也可以直接用@value的方式,从配置文件中读取过来,那么需要修改参数值的时候,就直接更改配置文件就行了,这点相信就不用教了,不懂的网上一搜一堆。

最后,在@KafkaListener 中指定容器名称

@KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
@KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")
启动,你就会发现,卧槽,还真可以
[           main] xxx     : Kafka version : 0.10.1.1
[           main] xxx     : Kafka commitId : f10ef2720b03b247
[           main] xxx     : Tomcat started on port(s): 82 (http)
[           main] xxx     : Started App in 3.913 seconds (JVM running for 4.321)
[    test2-0-C-1] xxx     : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group test2.
[    test1-0-C-1] xxx     : Discovered coordinator 192.168.52.131:9092 (id: 2147483645 rack: null) for group test1.

至此,就实现了多个customer不同group的功能,亲测有效。

参考:https://blog.csdn.net/caijiapeng0102/article/details/80765923

https://www.jianshu.com/p/6a44da908e48  高版本 在@KafkaListener 注解中有groupId属性可以设置


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐