Spring Boot 集成 Kafka 遇到问题
问题1:nested exception is org.apache.kafka.common.errors.RecordTooLargeException:异常类: nested exception is org.apache.kafka.common.errors.RecordTooLargeException:很明显,消息体目前过大。需要提高Kafka服务器能够接收的最大数据大小。解决方法:
问题1:nested exception is org.apache.kafka.common.errors.RecordTooLargeException:
异常类: nested exception is org.apache.kafka.common.errors.RecordTooLargeException:
很明显,消息体目前过大。需要提高Kafka服务器能够接收的最大数据大小。
解决方法:提高Kafka服务器单条消息最大大小。配置参数 message.max.bytes
我的方法:生产环境大概每份文件不超过5M,调整服务器broker参数message.max.bytes=510241024
问题2:Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.
异常类:org.apache.kafka.common.errors.DisconnectException
异常类描述:Server disconnected before a request could be completed.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.clients.FetchSessionHandler[442] - [Consumer clientId=consumer-1, groupId=auto-dev] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.
问题定位:由于消息读写频率较高,因此kafka服务器的负载达到上限。所以有些消费者实例一直就poll不到数据。达到配置的时间【session.timeout.ms】、【request.timeout.ms】,消费者会被踢出Kafka。因此日志不断打印该错误。(其实另一方面也提示消费时间过长)
解决方法:提高consumer参数【session.timeout.ms】、【request.timeout.ms】的值。
我的方法:在application.yaml中为【session.timeout.ms】、【request.timeout.ms】这两个参数均调整到4分钟,即240000ms。
问题3:TimeoutException: Expiring 1 record(s) … has passed since batch creation
问题定位:消息已经投递到kafka,但是kafka来不及处理。提示kafka写性能达到瓶颈。
我的方法:可以从两方面修改。
(1)控制代码发送频率,即控制调用kafkaTemplate.send方法的频率。作为有经验的开发人员,不要问我怎么控制发送频率。代码里加入Thread.sleep就可以不是么
(2)在生产者参数中提高【delivery.timeout.ms】的时间。默认2分钟kafka没有响应则报异常。我们可以增加该时间,单位毫秒。但是有上限,不能超过【request.timeout.ms】与【linger.ms】之和。
更多推荐
所有评论(0)