kafka传递大消息体时的配置

created by cms/fangchangtan | 2020/09/30

1.生产端设置

//数据太大,发送失败,设置 max.request.size
props.put("max.request.size", "300000000");

/*socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽
*/
props.put("send.buffer.bytes", "300000000");
//kafka发送时候分为同步和异步方式,默认异步方式,会首先将几个消息发送到kafka的客户端buffer缓冲区中,然后凑够一定大小和时间之后,按照一个批次发送到kafka的broker服务上,因此如果你的单条消息大小为M个字节,则需要设置buffer.memory大于M字节才能放下数据,该值默认是32M;
props.put("buffer.memory", "300000000");        

其中

  • max.request.size

    ​ 默认值是 1028576 。该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB,那么可以发送的单个最大消息为 1MB ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1MB。另外, broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝 .

    ​ 这个参数限制生产者发送数据包的大小,数据包的大小与消息的大小、消息数相关。如果我们指定了最大数据包大小为1M,那么最大的消息大小为1M,或者能够最多批量发送1000条消息大小为1K的消息。另外,broker也有message.max.bytes参数来控制接收的数据包大小。在实际中,建议这些参数值是匹配的,避免生产者发送了超过broker限定的数据大小。

  • buffer.memory

​ 这个参数设置生产者缓冲发送的消息的内存大小,如果应用调用send方法的速度大于生产者发送的速度,那么调用会阻塞或者抛出异常,具体行为取决于block.on.buffer.full(这个参数在0.9.0.0版本被max.block.ms代替,允许抛出异常前等待一定时间)参数。

  • send.buffer.bytes,receive.buffer.bytes

​ 这两个参数设置用来发送/接收数据的TCP连接的缓冲区,如果设置为-1则使用操作系统自身的默认值。如果生产者与broker在不同的数据中心,建议提高这个值,因为不同数据中心往往延迟比较大。

2.broker端的设置

如果kafka想要接受较大的消息体,比如单条消息的大小平均1M-200M不等,那么我们需要在启动kafka容器的时候就设置好相应的环境变量,是的kafka集群的全局允许接受复制较大的消息体。

如下为docker容器中设置的kafka的环境变量相关的参数:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-I79ii0RO-1601435216747)(20200929-kafka传递大消息体时的配置.assets/image-20200929164732844.png)]

其中参数解释如下:

1.KAFKA_MESSAGE_MAX_BYTES,该参数表示broker的topic分区leader接受数据的时候,允许的单条消息的最大值,默认为1M;

2.KAFKA_REPLICA_FETCH_MAX_BYTES,该参数表示broker端的leader分区在想其他follower分区复制消息时候 ,允许的单条消息的最大值;

3.KAFKA_SOCKET_REQUEST_MAX_BYTES ,该参数表示broker端的规划的缓存区的大小,该缓存区一般要大于1,2中的参数值,表示需要存够一定数据量数据之后开始刷写磁盘;

4.KAFKA_HEAP_OPTS ,该参数表示kafka使用的堆空间的大小,因为kafka本身的进程一个JVM,虽然作为消息系统不依赖于较大的kafka堆空间,但是较小的堆空间不利于中间批量数据的缓存

5.KAFKA_JVM_PERFORMANCE_OPTS

​ 该参数值为:

-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxDirectMemorySize=8192m -Djava.awt.headless=true

​ 表对kafka使用G1垃圾收集器等相关优化配置。

3. 消费端设置

在消费端设置对应的读取大小的限制

kafkaParams.put("fetch.message.max.bytes", "300000000");
kafkaParams.put("max.partition.fetch.bytes", "300000000");
  • max.partition.fetch.bytes:默认值是 1024*1024 。生产环境中建议此参数的值设置的与
    broker 中message.max.bytes 的值一样,或者大于这个值。该属性指定了服务器从每个分区
    里返回给消费者的最大字节数。它的默认值是 1 MB , 也就是说, KafkaConsumer.poll() 方
    住从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。
    max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过
    message.max.bytes 属性配置)大, 否则消费者可能无法读取这些消息,导致消费者一直挂
    起重试。
  • fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。

【参考资料】

[kafka中处理超大消息的一些考虑]https://www.cnblogs.com/qiumingcheng/p/5631309.html

《kafka使用与配置总结.pdf》

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐