1. 背景

  • 2022-01-25 16:54 收到研发反馈订单同步出现异常

  • 2022-01-25 17:10 定位到问题,同步程序报错发送的消息体超过最大限制

  • 2022-01-25 17:25 完成配置优化及重启

  • 2022-01-25 17:34 完成数据验证,延迟消息已自动补回

2. 异常分析

1. 报错分析

ERROR c.a.o.canal.connector.kafka.producer.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1350613 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1350613 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
  1. 从报错信息中可以看到,发送的消息体超过了kafka生产者接受的最大消息体大小

  2. 查看配置信息,默认最大大小为1048576(为提高吞吐能力,默认限制1兆)

定位到具体消息:订单单次发送消息体过大,其中最长字段为检疫地址,该字段最大长度限制为65535 bytes

2. 源码剖析

源码中的判断逻辑,org.apache.kafka.clients.producer$ensureValidRecordSize

private void ensureValidRecordSize(int size) {
    if (size > this.maxRequestSize) {
        throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than " + this.maxRequestSize + ", which is the value of the " + "max.request.size" + " configuration.");
    } else if ((long)size > this.totalMemorySize) {
        throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + "buffer.memory" + " configuration.");
    }
}

kafka会先去判断本次消息的大小,是否超过maxRequestSize的大小,如果超过就直接抛出异常了

默认值的大小

...
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
...

3. 处理措施

1. 修改canal配置

  • 默认值

kafka.max.request.size = 1048576

  • 修改为

kafka.max.request.size = 10485760 

(在业务低峰探索优化至更合适值,在保证业务正常的情况下尽可能保证性能)

2. 修改Kafka配置

  • 默认配置

message.max.bytes = 10485760

  • 修改为

message.max.bytes = 20971520

3. 核对数据,评估影响

2022-01-25 17:27:53.606 [destination = order_center , address = /10.106.176.63:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"10.106.176.63","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000087","position":464718685,"serverId":17663,"timestamp":1643097681000}}
2022-01-25 17:27:54.158 [destination = order_center , address = /10.106.176.63:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000087,position=464718685,serverId=17663,gtid=,timestamp=1643097681000] cost : 619ms , the next step is binlog dump

6. 监控核心指标

参考文档:https://github.com/alibaba/canal/wiki/Prometheus-QuickStart

参考资料

附:其他性能关键参数

batch.size和linger.ms这两个参数:

  1. 数据从报错位置开始同步,未造成数据丢失

  2. 约400条数据延迟,最大延迟40分钟

  3. 5. 反思总结

  4. 当前最大能支撑消息是多大?

  5. 每次消息最大10M(1字符 = 3字节,约等于350万字符)

  6. 2.  全局考虑其他系统数据同步是否会有类似问题

  7. 存在类似风险,逐个排查修复

  8. 3.  其他组件的默认参数是否合理?选取重要参数做性能评估

  9. 搭建之初进行过一次评估,后续每月与平台搭建成员共同进行一次参数优化评估

  10. 阅读组件相关的全部issue,尽可能规避风险 
    https://github.com/alibaba/canal/issues?q=is%3Aissue+is%3Aopen+sort%3Acomments-desc

  11. blocking:阻塞指标

    1. dump:dump线程blocking时间占比,clamp_max(rate(canal_instance_sink_blocking_time{destination="example"}[2m]), 1000) / 10

    2. sink:sink线程blocking时间占比,clamp_max(rate(canal_instance_publish_blocking_time{destination="example"}[2m]), 1000) / 10

    3. 出现下述情况,则原因可能为:

      1. dump blocking ratio、sink blocking ratio占比高:client消费速度慢

      2. dump blocking ratio占比高、sink blocking ratio占比低:canal server parser解析数据慢

  12. TPS(MySQL transaction)

    1. Canal instance处理transaction的TPS,以TRANSACTION_END事件为基准

  13. TPS(Table row)

    1. store put操作的tableRows TPS

    2. client get操作的tableRows TPS

    3. client ack操作的tableRows TPS

  14. https://github.com/alibaba/canal

  15. batch.size : 该值对kafka的producer的吞吐量有很大的影响,可以看到默认值是16384 bytes,可以通过一批消息通过较少的请求发送到broker,这样的话无论是对于client端还是server端,都会有性能提升。但是该值设置的过大的,肯能会对内存造成浪费,通常我恩还会设置一个缓冲区大小,也就是buffer.memory这个参数,所以生产环境还是需要根据实际情况进行合理的调整

  16. linger.ms : 该参数配置batch.size使用,也就是说,如果配置了该参数以及上面的这个参数,那么无论你batch的大小是否达到size的大小,只要时间到该参数指定的值后,batch中的消息同样会被发送到broker,也就是说,linger.ms与batch.size二者只要其中一个条件满足,消息则会被发送到broker

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐