springboot整合kafka消息发送失败问题-版本不匹配问题
今天测试kafka,发现高版本的springboot整合kafka消息发送失败,低版本就可以,刚看是查了好久,就是没找到问题,查到有人说是版本不匹配问题,于是更换成springboo 1.5.13,成功了,亲测springboot2.1.0也可以先测试的是boot2.0.1的版本,同样的代码使用boot的1.5.13就是ok的,具体什么原因不清楚,debug页面也买看到错误iserver...
·
今天测试kafka,发现高版本的springboot整合kafka消息发送失败,低版本就可以,刚看是查了好久,就是没找到问题,查到有人说是版本不匹配问题,于是更换成springboo 1.5.13,成功了,亲测springboot2.1.0也可以
先测试的是boot2.0.1的版本,同样的代码使用boot的1.5.13就是ok的,具体什么原因不清楚,debug页面也买看到错误i
server.port=8888 logging.level.root=debug #kafka的服务器地址 spring.kafka.bootstrap-servers=nodechd01:9092,node02:9092,node03:9092 #如果出现发送失败的情况,允许重试的次数 spring.kafka.producer.retries=0 #每个批次发送多发的数量 spring.kafka.producer.batch-size=4096 kafka.linger.ms=1 #定时发送。 spring.kafka.template.default-topic=test spring.kafka.producer.buffer-memory=40968
代码案例
@SpringBootTest(classes=KafkaConfig.class)
@RunWith(SpringRunner.class)
public class TextCntroller {
@Autowired
KafkaTemplate kafkaTemplate;
@Test
public void senfTest01(){
for(int i=0;i<100;i++){
kafkaTemplate.send("pyptest01","key","test msg "+i);
}
}
}
package com.itheima.config;
import com.itheima.util.RoundRobinPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* 创建kafkatmple
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrap_servers_config;
@Value("${spring.kafka.producer.retries}")
private int retries_config;
@Value("${spring.kafka.producer.batch-size}")
private int batch_size_config;
@Value("${kafka.linger.ms}")
private int linger_ms_config;
@Value("${spring.kafka.producer.buffer-memory}")
private int buffer_memory_config;
@Bean
public KafkaTemplate kafkaTemplate(){
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap_servers_config);
configs.put(ProducerConfig.LINGER_MS_CONFIG,linger_ms_config);
configs.put(ProducerConfig.RETRIES_CONFIG,retries_config);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG,batch_size_config);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG,buffer_memory_config);
// 设置key value的序列化器
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 指定自定义分区
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
DefaultKafkaProducerFactory<String, String> kafkaProducerFactory = new DefaultKafkaProducerFactory<>(configs);
return new KafkaTemplate(kafkaProducerFactory);
}
}
package com.itheima.util;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
*自定义分区规则
*/
public class RoundRobinPartitioner implements Partitioner{
AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
Integer partitions = cluster.partitionCountForTopic(s);
int curpartition = counter.incrementAndGet() % partitions;
if(counter.get()>4000){
counter.set(0);
}
return curpartition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
pom.xml配置
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
<!--<version>2.0.1.RELEASE</version>-->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!-- <version>2.1.3.RELEASE</version>-->
<!-- <version>1.0.6.RELEASE</version>-->
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
更多推荐
已为社区贡献1条内容
所有评论(0)