问题1:nested exception is org.apache.kafka.common.errors.RecordTooLargeException:

异常类: nested exception is org.apache.kafka.common.errors.RecordTooLargeException:

很明显,消息体目前过大。需要提高Kafka服务器能够接收的最大数据大小。

解决方法:提高Kafka服务器单条消息最大大小。配置参数 message.max.bytes

我的方法:生产环境大概每份文件不超过5M,调整服务器broker参数message.max.bytes=510241024

问题2:Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

异常类:org.apache.kafka.common.errors.DisconnectException

异常类描述:Server disconnected before a request could be completed.

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.clients.FetchSessionHandler[442] - [Consumer clientId=consumer-1, groupId=auto-dev] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 101: org.apache.kafka.common.errors.DisconnectException.

问题定位:由于消息读写频率较高,因此kafka服务器的负载达到上限。所以有些消费者实例一直就poll不到数据。达到配置的时间【session.timeout.ms】、【request.timeout.ms】,消费者会被踢出Kafka。因此日志不断打印该错误。(其实另一方面也提示消费时间过长)

解决方法:提高consumer参数【session.timeout.ms】、【request.timeout.ms】的值。

我的方法:在application.yaml中为【session.timeout.ms】、【request.timeout.ms】这两个参数均调整到4分钟,即240000ms。

问题3:TimeoutException: Expiring 1 record(s) … has passed since batch creation

问题定位:消息已经投递到kafka,但是kafka来不及处理。提示kafka写性能达到瓶颈。

我的方法:可以从两方面修改。

(1)控制代码发送频率,即控制调用kafkaTemplate.send方法的频率。作为有经验的开发人员,不要问我怎么控制发送频率。代码里加入Thread.sleep就可以不是么

(2)在生产者参数中提高【delivery.timeout.ms】的时间。默认2分钟kafka没有响应则报异常。我们可以增加该时间,单位毫秒。但是有上限,不能超过【request.timeout.ms】与【linger.ms】之和。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐