今天测试kafka,发现高版本的springboot整合kafka消息发送失败,低版本就可以,刚看是查了好久,就是没找到问题,查到有人说是版本不匹配问题,于是更换成springboo 1.5.13,成功了,亲测springboot2.1.0也可以

先测试的是boot2.0.1的版本,同样的代码使用boot的1.5.13就是ok的,具体什么原因不清楚,debug页面也买看到错误i

server.port=8888
logging.level.root=debug

#kafka的服务器地址
spring.kafka.bootstrap-servers=nodechd01:9092,node02:9092,node03:9092
#如果出现发送失败的情况,允许重试的次数
spring.kafka.producer.retries=0
#每个批次发送多发的数量
spring.kafka.producer.batch-size=4096
kafka.linger.ms=1
#定时发送。
spring.kafka.template.default-topic=test
spring.kafka.producer.buffer-memory=40968

代码案例


@SpringBootTest(classes=KafkaConfig.class)
@RunWith(SpringRunner.class)
public class TextCntroller {

    @Autowired
    KafkaTemplate kafkaTemplate;
    @Test
    public void senfTest01(){
        for(int i=0;i<100;i++){
            kafkaTemplate.send("pyptest01","key","test msg "+i);
        }
    }
}

package com.itheima.config;

import com.itheima.util.RoundRobinPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

/**
 * 创建kafkatmple
 */
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrap_servers_config;
    @Value("${spring.kafka.producer.retries}")
    private int retries_config;
    @Value("${spring.kafka.producer.batch-size}")
    private int batch_size_config;
    @Value("${kafka.linger.ms}")
    private int linger_ms_config;
    @Value("${spring.kafka.producer.buffer-memory}")
    private int buffer_memory_config;
    @Bean
    public KafkaTemplate kafkaTemplate(){
        Map<String,Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers_config);
        configs.put(ProducerConfig.LINGER_MS_CONFIG,linger_ms_config);
        configs.put(ProducerConfig.RETRIES_CONFIG,retries_config);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG,batch_size_config);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG,buffer_memory_config);
        // 设置key value的序列化器
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 指定自定义分区
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

        DefaultKafkaProducerFactory<String, String> kafkaProducerFactory = new DefaultKafkaProducerFactory<>(configs);
        return new KafkaTemplate(kafkaProducerFactory);
    }

}


package com.itheima.util;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
*自定义分区规则
*/
public class RoundRobinPartitioner  implements Partitioner{
    AtomicInteger counter = new AtomicInteger(0);
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        Integer partitions = cluster.partitionCountForTopic(s);
        int curpartition  = counter.incrementAndGet() % partitions;
        if(counter.get()>4000){
            counter.set(0);
        }
        return curpartition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

pom.xml配置

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.13.RELEASE</version>
        <!--<version>2.0.1.RELEASE</version>-->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
           <!-- <version>2.1.3.RELEASE</version>-->
          <!--  <version>1.0.6.RELEASE</version>-->
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>http-client</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐