kafka大消息

生产者向broker发送消息体超过限制

The message is 7000158 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Exception thrown when sending a message with key='null' and payload='白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山...' to topic kafka_test_topic_4:

org.apache.kafka.common.errors.RecordTooLargeException: The message is 7000158 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

 

 生产者生产者向broker发送消息体超过限制, 在生产者中设置max.request.size

spring.kafka.producer.max-request-size
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,maxRequestSize);

broker topic能接受的消息超过限制

The request included a message larger than the max message size the server will accept.

 

2020-12-24 11:34:51.973 ERROR 9928 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='白日依山尽,黄河入海流。 欲穷千里目,。...' to topic kafka_test_topic_4:

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

 broker topic能接受的消息超过限制,设置topic属性max.message.bytes;

通过kafka manger设置或者通过kafka bin目录下的命令设置

设置属性
bin/kafka-topics.sh --alter --topic kafka_test_topic_3 --config max.message.bytes=10485760  --zookeeper localhost:2181
/ kafka-topics.sh 过期,使用kafka-configs.sh 
kafka-configs.sh --alter --entity-type topics --entity-name kafka_test_topic_3 --add-config max.message.bytes=10485760  --zookeeper localhost:2181

broker全局接收的消息超过限制

接受到请求:白日依山尽,黄河入海流。 欲穷千里目,更上一层楼
2020-12-24 11:24:37.648  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Got error produce response with correlation id 16 on topic-partition kafka_test_topic_3-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION
2020-12-24 11:24:37.649  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Received invalid metadata error in produce request on partition kafka_test_topic_3-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2020-12-24 11:24:37.757  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Got error produce response with correlation id 19 on topic-partition kafka_test_topic_3-0, retrying (1 attempts left). Error: NETWORK_EXCEPTION
2020-12-24 11:24:37.757  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Received invalid metadata error in produce request on partition kafka_test_topic_3-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2020-12-24 11:24:37.865  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Got error produce response with correlation id 22 on topic-partition kafka_test_topic_3-0, retrying (0 attempts left). Error: NETWORK_EXCEPTION
2020-12-24 11:24:37.866  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Received invalid metadata error in produce request on partition kafka_test_topic_3-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
2020-12-24 11:24:37.974 ERROR 11676 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山尽,黄河入海流。 欲穷千里目,更上一层楼白日依山...' to topic kafka_test_topic_3:

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

2020-12-24 11:24:37.974  WARN 11676 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Received invalid metadata error in produce request on partition kafka_test_topic_3-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now

 

设置broker的config的server.properties  socket.request.max.bytes=10485760

总结:

生产者向kafka发送大消息,只需要在生产者做设置max.message.bytes即可;

而broker要接受大消息,需要配置文件server.properties 设置最大消息 socket.request.max.bytes=10485760,同时topic的消息大小优先于socket.request.max.bytes,topic也许设置参数socket.request.max.bytes。

kafka压缩

如果消息过大,启用压缩也许是一种好方法

压缩种类查看kafka类CompressionType

 

 生产者设置压缩设置如下

spring.kafka.producer.compression-type: none    # none  lz4  gzip  snappy
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);

Broker端也有compression.type参数,默认值是producer,表示Broker端会尊重Producer端使用的压缩算法一旦Broker端设置了不同的compression.type,可能会发生预料之外的压缩/解压缩操作,导致CPU使用率飙升

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐