SpringBoot学习:kafka批量并发消费配置(consumer+producer)
springboot使用的是2.0.0版本,spring-kafka使用的是2.1.4版本,配置的时候遇到了一些问题,在此总结一下:1. session-timeout连接超时时间,之前 配置的是3000(ms),一直报异常,堆栈信息提示连接超时时间不能大于“某时间”,这里一直没弄懂“某时间”是指哪个时间。ps:忘记“ ”里的时间是什么了,可能是我英语太差的原因。2. 是否开启自动提交...
springboot使用的是2.0.0版本,spring-kafka使用的是2.1.4版本,配置的时候遇到了一些问题,在此总结一下:
1. session-timeout
连接超时时间,之前 配置的是3000(ms),一直报异常,堆栈信息提示 连接超时时间不能大于“某时间”,这里一直没弄懂“某时间”是指哪个时间,后改为6000(ms)(若有大佬知道的,欢迎骚扰!!!)。
ps:忘记“ ”里的时间是什么了,可能是我英语太差的原因。
2. 是否开启自动提交enable-auto-commit: false
,不开启需手动提交偏移量(offset)
enable-auto-commit: false #是否开启自动提交
#auto-commit-interval: 1000 #自动提交的间隔时间,自动提交去掉#号
//设置偏移量的方式
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
//kafkalistener中加入
ack.acknowledge();//手动提交偏移量
- 向kafka发送数据以及消费kafka中的数据,json的序列化和反序列化使用了不同的json框架,我在此就分别用了jackson和fastjson,导致消费抛出异常。
springboot整合kafka配置:
- pom文件;
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
- 配置文件:
#kafka配置信息
kafka:
producer:
bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
batch-size: 16785 #一次最多发送数据量
retries: 1 #发送失败后的重复发送次数
buffer-memory: 33554432 #32M批处理缓冲区
linger: 1
consumer:
bootstrap-servers: 10.161.11.222:6667,10.161.11.223:6667,10.161.11.224:6667
auto-offset-reset: latest #最早未被消费的offset earliest
max-poll-records: 3100 #批量消费一次最大拉取的数据量
enable-auto-commit: false #是否开启自动提交
auto-commit-interval: 1000 #自动提交的间隔时间
session-timeout: 20000 #连接超时时间
max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
listener:
batch-listener: true #是否开启批量消费,true表示批量消费
concurrencys: 3,6 #设置消费的线程数
poll-timeout: 1500 #只限自动提交,
- 生产者配置:
/**
* @Auther: hs
* @Date: 2019/3/6 21:57
* @Description:
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.retries}")
private Integer retries;
@Value("${kafka.producer.batch-size}")
private Integer batchSize;
@Value("${kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${kafka.producer.linger}")
private Integer linger;
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(7);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
/*producerFactory.transactionCapable();
producerFactory.setTransactionIdPrefix("hous-");*/
return producerFactory;
}
/*@Bean
public KafkaTransactionManager transactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
return manager;
}*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- KafkaSender:
/**
* @Auther: hs
* @Date: 2019/3/6 23:46
* @Description:
*/
@Component
@Slf4j
public class KafkaSender {
private final KafkaTemplate<String, String> KAFKA_TEMPLATE;
@Autowired
public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
this.KAFKA_TEMPLATE = kafkaTemplate;
}
public void sendMessage(String topic, String message){
ListenableFuture<SendResult<String, String>> sender = KAFKA_TEMPLATE.send(new ProducerRecord<>(topic, message));
// //发送成功
// SuccessCallback successCallback = result -> log.info("数据发送成功!");
// //发送失败回调
// FailureCallback failureCallback = ex -> log.error("数据发送失败!");
sender.addCallback(result -> {}, ex -> log.error("数据发送失败!"));
}
}
- 消费者配置:
/**
* @author: hs
* @Date: 2019/3/5 19:50
* @Description:
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Value("${kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;
@Value("${kafka.consumer.max-poll-records}")
private Integer maxPollRecords;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("#{'${kafka.listener.concurrencys}'.split(',')[0]}")
private Integer concurrency3;
@Value("#{'${kafka.listener.concurrencys}'.split(',')[1]}")
private Integer concurrency6;
@Value("${kafka.listener.poll-timeout}")
private Long pollTimeout;
@Value("${kafka.consumer.session-timeout}")
private String sessionTimeout;
@Value("${kafka.listener.batch-listener}")
private Boolean batchListener;
@Value("${kafka.consumer.max-poll-interval}")
private Integer maxPollInterval;
@Value("${kafka.consumer.max-partition-fetch-bytes}")
private Integer maxPartitionFetchBytes;
/**
* 并发数6
*
* @return
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener6")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener6() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
factory.setConcurrency(concurrency6);
return factory;
}
/**
* 并发数3
*
* @return
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener3")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener3() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
factory.setConcurrency(concurrency3);
return factory;
}
private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//批量消费
factory.setBatchListener(batchListener);
//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
// 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
//手动提交无需配置
factory.getContainerProperties().setPollTimeout(pollTimeout);
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
- KafkaListener:
/**
* @Auther: hs
* @Date: 2019/3/7 00:12
* @Description:
*/
@Slf4j
@Component
public class KafkaListeners {
@KafkaListener(containerFactory = "kafkaBatchListener6",topics = {"#{'${kafka.listener.topics}'.split(',')[0]}"})
public void batchListener(List<ConsumerRecord<?,?>> records,Acknowledgment ack){
List<User> userList = new ArrayList<>();
try {
records.forEach(record -> {
User user = JSON.parseObject(record.value().toString(),User.class);
user.getCreateTime().format(DateTimeFormatter.ofPattern(Contants.DateTimeFormat.DATE_TIME_PATTERN));
userList.add(user);
});
} catch (Exception e) {
log.error("Kafka监听异常"+e.getMessage(),e);
} finally {
ack.acknowledge();//手动提交偏移量
}
}
}
- 定时任务:
/**
* @Auther: hs
* @Date: 2019/3/7 00:53
* @Description:
*/
@Component
public class UserTask {
@Value("#{'${kafka.listener.topics}'.split(',')}")
private List<String> topics;
private final MultiService MUlTI_SERVICE;
private final KafkaSender KAFKA_SENDER;
@Autowired
public UserTask(MultiService multiService, KafkaSender kafkaSender){
this.MUlTI_SERVICE = multiService;
this.KAFKA_SENDER = kafkaSender;
}
@Scheduled(fixedRate = 10 * 1000)
public void addUserTask() {
User user=new User();
user.setUserName("HS");
user.setDescription("text");
user.setCreateTime(LocalDateTime.now());
String JSONUser = JSON.toJSONStringWithDateFormat(user,
Contants.DateTimeFormat.DATE_TIME_PATTERN,//日期格式化
SerializerFeature.PrettyFormat);//格式化json
for (int i = 0; i < 700; i++) {
KAFKA_SENDER.sendMessage(topics.get(0), JSONUser);
}
MUlTI_SERVICE.addUser(user);
}
}
- 其他类:
/**
* @Auther: hs
* @Date: 2019/3/6 20:21
* @Description:
*/
public class Contants {
public static class DateTimeFormat{
public static final String DATE_TIME_PATTERN="yyyy-MM-dd HH:mm:ss";
public static final String DATE_PATTERN="yyyy-MM-dd";
public static final String TIME_PATTERN="HH:mm:ss";
public static final String DATE_TIME_STAMP="yyyyMMddHHmmss";
public static final String DATE_STAMP="yyyyMMdd";
public static final String TIME_STAMP="HHmmss";
}
}
/**
* @Auther: hs
* @Date: 2019/2/23 17:53
* @Description:
*/
@Data
public class User {
private Integer id;
private String userName;
private String description;
//@JSONField(format = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
}
- 启动类:
@ComponentScan("com.*****")
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableScheduling
@EnableAsync
public class MysqldbApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(MysqldbApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
JSON.DEFFAULT_DATE_FORMAT= Contants.DateTimeFormat.DATE_TIME_PATTERN;
}
}
总结:自动提交,在服务启停时,会有重复数据被生产到kafka中,保证吞吐量的同时,降低了kafka的原子性;手动提交,保证了kafka的原子性,同时降低了kafka的吞吐量,实际开发中,可跟随数据量的大小,自行分析配置。
error:
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:721)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:599)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1242)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.ackImmediate(KafkaMessageListenerContainer.java:789)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processAck(KafkaMessageListenerContainer.java:772)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2400(KafkaMessageListenerContainer.java:314)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ConsumerBatchAcknowledgment.acknowledge(KafkaMessageListenerContainer.java:1342)
at com.staryea.servicelevel.adjustment.kafka.KafkaListeners.spanBatchListener(KafkaListeners.java:66)
at sun.reflect.GeneratedMethodAccessor76.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248)
... 9 common frames omitted
若调整session-timeout
不起效果,请调整max-poll-interval
参数,一条业务线没有跑完,时间已经超出了要poll的时间,就会报如上错误。
若数据积压,可以调整max-partition-fetch-bytes
参数,默认1M(110241024),只会拉取1M的数据,配合max-poll-records
可以限制批量拉取数据的数量
AckMode :
RECORD
每处理一条commit一次
BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率
TIME
每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
COUNT
累积达到ackCount次的ack去commit
COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit
MANUAL
listener负责ack,但是背后也是批量上去
MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit
更多推荐
所有评论(0)