1、向Kafka中输入数据,抛异常

WARN async.DefaultEventHandler: Produce request with correlation id 92548048 failed due to [TopicName,1]: org.apache.kafka.common.errors.RecordTooLargeException

官网两个参数描述如下:

message.max.bytesThe maximum size of message that the server can receiveint1000012[0,...]high
fetch.message.max.bytes1024 * 1024The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.

message.max.bytes:server能接受消息体的最大值。

fetch.message.max.bytes:consumer从partition中获取消息体放入内存中,这个参数控制conusmer所用的内存大小。如果message.max.bytes大于fetch.message.max.bytes,就会导致consumer分配的内存放不下一个message。

因此,在server.properties中添加配置项

1

2

3

4

#broker能接收消息的最大字节数

message.max.bytes=20000000

#broker可复制的消息的最大字节数

fetch.message.max.bytes=20485760

如果不想修改配置文件,可以采用修改topic配置的方法,与server配置项message.max.bytes对应的topic配置项是max.message.bytes

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --config max.message.bytes=128000

注意:需要修改所有节点的配置文件并且重启kafka,这样重新选举主节点的时候能够读取最新的配置文件,并且在以后主节点切换的时候读取的配置文件都是同一个配置。

2、读取Kafka中数据,有异常

 

kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic TopicName partition 0 at fetch offset 42057452. Increase the fetch size, or decrease the maximum message size the broker will allow.

这个与Kafka消费者的参数fetch.message.max.bytes有关,

在增加message.max.bytes之后,表示进入Kafka消息体变大,此时控制消费者接受消息大小的参数也要有相应变化,

我使用Flume读取Kafka中消息,从而我Flume Agent的配置文件中会有如下配置

consumer.sources.sourcename.kafka.fetch.message.max.bytes=20485760

 

文章来源:http://blog.51cto.com/10120275/1844461

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐