本文主要简述实践部分,不涉及kafka原理等。

 

 

kafka安装

http://kafka.apache.org/downloads

在官网下载这个版本的kafka(其它版本没试过,可能会有与springboot版本出现冲突的问题)

上传到linux服务器,然后解压到一个目录下,进入kafka的config目录修改配置文件server.properties,

将其中的一个已经被注释的配置  listeners=PLAINTEXT://localhost:9092去掉注释

然后将localhost修改成linux服务器的ip地址

这里我是改成listeners=PLAINTEXT://192.168.45.128:9092

我的linux服务器是在本地VM虚拟机里的

其他的配置文件都不需要改,因为我们使用的是单机版的kafka,而且这个版本的kafka已经内置了一个zookeeper环境,所以可以直接使用。

 

 

kafka启动

需要root权限,输入指令su然后输入root的密码切换成root权限,否则启动会报错

1.启动zookeeper

cd进入kafka解压目录,输入指令启动zookeeper

./bin/zookeeper-server-start.sh  ./config/zookeeper.properties  

2.启动kafka

./bin/kafka-server-start.sh  ./config/server.properties

 

一般没报错的话就说明启动成功了,实践到这里只出现了无root权限启动失败问题,其它问题没碰到。

 

 

springboot整合kafka

环境:

springboot  2.1.7.RELEASE

添加spring-kafka   2.2.8.RELEASE依赖

 

以下的实践用的是配置类的形式去配置kafka,不会用到配置文件,因为用配置类这种方式比较灵活,可以配置不同参数值的主题、生产者、消费者等。

 

1.创建主题配置文件

@Configuration
public class TopicConfigure {

    @Bean
    public KafkaAdmin admin(){
        Map<String,Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic zhubo(){
        return new NewTopic("zhubo",10, (short) 1);
    }
}

@Configuration的作用是说明这个类是配置类

@Bean的作用是将类注入到容器

admin方法是配置kafka地址

zhubo方法是配置主题,其中  "zhubo"是主题名,10是分区数量,1是副本数量。这里我们用的是单机版,所以副本用1个就可以了,分区数量按实际所需来配置。

分区的作用是:假设主题有10个分区,那么kafka会尽可能将这些分区均匀分布在各个服务器中,那么多个消费者消费主题的时候,就可以进行轮询消费,例如消费者1从服务器1分区消费,消费者2从服务器2分区消费,减轻了单个服务器的压力。

副本的作用是:备份数据,以分区为单位进行备份,假设一个分区有n个副本,那么n肯定不能大于服务器数量,因为副本肯定是存在多个服务器中以保证数据不会丢失的,kafka一般默认每个服务器存一份。

 

这样我们主题就配置好了,启动springboot入口类,就创建好了一个名叫"zhubo",分区数为10,副本数为1的主题

如果还需要创建其它主题,只需要在配置类中添加多几个方法就可以了,例如:

    @Bean
    public NewTopic zhubo2(){
        return new NewTopic("zhubo2",10, (short) 1);
    }

 

2.创建生产者配置类

@Configuration
public class ProducerConfigure {

    public ProducerFactory<Integer,String> producerFactory(){
        return new DefaultKafkaProducerFactory<Integer, String>(producerConfigs());
    }

    private Map<String,Object> producerConfigs() {
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        props.put(ProducerConfig.RETRIES_CONFIG,0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG,1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    @Bean
    public KafkaTemplate<Integer,String> kafkaTemplate(){
        return new KafkaTemplate<Integer, String>(producerFactory());
    }
}

参数说明:

ACKS_CONFIG

这个参数关于高可用,涉及数据的安全。 有三个值:0,1,all

一般kafka为了高可用需要用集群的方式部署,其中有一个leader,多个follower。

下面是各个参数值的说明

0:生产者只要把消息发出去了,就认为消息发送成功了。

1:生产者要把消息发出去,leader也把消息写入本地磁盘了,才认为成功了。

all:生产者要把消息发出去,leader和fllower都把消息写入磁盘了,才认为成功了。

为了数据安全,一般都是用all,当然,只有在集群的情况下才有效,单机是无效的,一般用all+集群,可以保证高可用和数据安全。

RETRIES_CONFIG

重试次数设置。重试时producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries>0,那么这些情况下producer会尝试重试。

Producer还有个参数max.in.flight.request.per.connection,这个参数是控制producer的io线程在单个socket连接上能够发送未应答producer请求的最大数量,增加此值可以增加io线程的吞吐量,如果该参数大于1,那么设置retries就有可能造成发送消息的乱序。版本为0.11.1.0的kafka支持消息的重试不会造成消息的重复发送。

BATCH_SIZE_CONFIG

当多个消息发往同一个分区,生产者会将它们放在同一个批次,该参数指定了一个批次可以使用的内存大小,按照字节数进行计算,当批次满了会被发送,没满的情况下也有可能被发送,所以即使设置得再大也不会消息延迟,设置太大会占用内存,设置太小会频繁发送消息,增加额外开销。

LINGER_MS_CONFIG

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。批次会在满的时候或者这个参数设置的时间到了的时候发送,这个参数设置得大的话会导致消息延迟,可以设置小一点,虽然会导致微小的延迟,但是批次可以多加入一些消息一起发送,减少开销。

BUFFER_MEMORY_CONFIG

设置生产者内存缓存区的大小,缓存生产者发往服务器的消息,如果生产者发送速率大于服务器接收速率,那么会导致生产者内存空间不足,此时send方法要么阻塞,要么抛出异常,具体行为依赖于max.block.ms参数,这个参数设置的是阻塞时间,超过这个时间就会抛出异常。默认值是33554432字节,合计32M。

KEY_SERIALIZER_CLASS_CONFIG

实现了DESERIALIZER的key的序列化类

VALUE_SERIALIZER_CLASS_CONFIG

实现了DESERIALIZER的value的序列化类

 

如果需要多个不同参数值的生产者,也是只要新建多几个方法就可以了。

 

 

3.新建消费者配置类

@Configuration
public class ConsumerConfigure {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    private ConsumerFactory<Integer,String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
    }

    private Map<String,Object> consumerConfigs() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

参数说明:

GROUP_ID_CONFIG

消费者组,如果消费者通过订阅主题来实现组管理功能,或者使用基于kafka的偏移量管理策略,这个配置是必须的。

ENABLE_AUTO_COMMIT_CONFIG

如果设为true,消费者的偏移量会定期在后台提交,即定期地往zookeeper写入每个分区的offset

AUTO_COMMIT_INTERVAL_MS_CONFIG

往zookeeper上写offset的频率

KEY_DESERIALIZER_CLASS_CONFIG

实现了DESERIALIZER的key的反序列化类

VALUE_DESERIALIZER_CLASS_CONFIG

实现了DESERIALIZER的value的反序列化类

 

如果需要配置多个不同参数的消费者,只要添加多几个方法就可以了,例如:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory2(){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    private ConsumerFactory<Integer,String> consumerFactory2() {
        return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
    }

    private Map<String,Object> consumerConfigs2() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.45.128:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"test2");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

 

 

 

新建消费者类

public class SimpleConsumerListener {

    @KafkaListener(id = "test1",topics = "zhubo",containerFactory = "kafkaListenerContainerFactory")
    public void listen1(String str){
        System.out.println("listen1接收:"+str);
    }

    @KafkaListener(id = "test2",topics = "zhubo",containerFactory = "kafkaListenerContainerFactory2")
    public void listen2(String str){
        System.out.println("listen2接收:"+str);
    }
}

其中的id保证其唯一性就可以了,即各个方法添加的注释的id不同,topics是主题名,containerFactory是指向消费者配置类配置的方法,指向之后消费者的参数就是使用的我们配置的参数,例如消费者配置类中的kafkaListenerContainerFactory方法我们配置的groupid是test,而kafkaListenerContainerFactory2方法我们配置的groupid是test2,那么消费者listen1就是test组的,listen2就是test2组的。

 

消费者类需要在消费者配置类中注入到容器,在消费者配置类中添加以下方法注入:

    @Bean
    public SimpleConsumerListener simpleConsumerListener(){
        return new SimpleConsumerListener();
    }

 

至于生产者类,这里我们不创建,而是在test类中直接注入template来发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class Kafkatest2ApplicationTests {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Autowired
    private KafkaTemplate kafkaTemplate2;

    @Test
    public void contextLoads() {
        kafkaTemplate.send("zhubo","heihei");
    }

    @Test
    public void contextLoads2() {
        kafkaTemplate2.send("zhubo","heihei");
    }

}

其中的kafkaTemplate和kafkaTemplate2参数名是对应的生产者配置类中的方法名,这样子才能正常注入

 

启动springboot入口类,然后运行测试类的测试方法,就可以在控制台中看到

listen1接收:heihei

listen2接收:heihei

 

说明消费者接收到了生产者发送的消息

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐