Canal数据同步故障
1.背景2022-01-2516:54收到研发反馈订单同步出现异常2022-01-2517:10定位到问题,同步程序报错发送的消息体超过最大限制2022-01-2517:25完成配置优化及重启2022-01-2517:34完成数据验证,延迟消息已自动补回2.异常分析1.报错分析ERRORc.a.o.canal.connector.kafka.producer.CanalKafkaProducer-
1. 背景
-
2022-01-25 16:54 收到研发反馈订单同步出现异常
-
2022-01-25 17:10 定位到问题,同步程序报错发送的消息体超过最大限制
-
2022-01-25 17:25 完成配置优化及重启
-
2022-01-25 17:34 完成数据验证,延迟消息已自动补回
2. 异常分析
1. 报错分析
ERROR c.a.o.canal.connector.kafka.producer.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1350613 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1350613 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
-
从报错信息中可以看到,发送的消息体超过了kafka生产者接受的最大消息体大小
-
查看配置信息,默认最大大小为1048576(为提高吞吐能力,默认限制1兆)
定位到具体消息:订单单次发送消息体过大,其中最长字段为检疫地址,该字段最大长度限制为65535 bytes
2. 源码剖析
源码中的判断逻辑,org.apache.kafka.clients.producer$ensureValidRecordSize
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize) {
throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than " + this.maxRequestSize + ", which is the value of the " + "max.request.size" + " configuration.");
} else if ((long)size > this.totalMemorySize) {
throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + "buffer.memory" + " configuration.");
}
}
kafka会先去判断本次消息的大小,是否超过maxRequestSize的大小,如果超过就直接抛出异常了
默认值的大小
...
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
...
3. 处理措施
1. 修改canal配置
-
默认值
kafka.max.request.size = 1048576
-
修改为
kafka.max.request.size = 10485760
(在业务低峰探索优化至更合适值,在保证业务正常的情况下尽可能保证性能)
2. 修改Kafka配置
-
默认配置
message.max.bytes = 10485760
-
修改为
message.max.bytes = 20971520
3. 核对数据,评估影响
2022-01-25 17:27:53.606 [destination = order_center , address = /10.106.176.63:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"10.106.176.63","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000087","position":464718685,"serverId":17663,"timestamp":1643097681000}}
2022-01-25 17:27:54.158 [destination = order_center , address = /10.106.176.63:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000087,position=464718685,serverId=17663,gtid=,timestamp=1643097681000] cost : 619ms , the next step is binlog dump
6. 监控核心指标
参考文档:https://github.com/alibaba/canal/wiki/Prometheus-QuickStart
参考资料
附:其他性能关键参数
batch.size和linger.ms这两个参数:
-
数据从报错位置开始同步,未造成数据丢失
-
约400条数据延迟,最大延迟40分钟
-
5. 反思总结
-
当前最大能支撑消息是多大?
-
每次消息最大10M(1字符 = 3字节,约等于350万字符)
-
2. 全局考虑其他系统数据同步是否会有类似问题
-
存在类似风险,逐个排查修复
-
3. 其他组件的默认参数是否合理?选取重要参数做性能评估
-
搭建之初进行过一次评估,后续每月与平台搭建成员共同进行一次参数优化评估
-
阅读组件相关的全部issue,尽可能规避风险
https://github.com/alibaba/canal/issues?q=is%3Aissue+is%3Aopen+sort%3Acomments-desc -
blocking:阻塞指标
-
dump:dump线程blocking时间占比,clamp_max(rate(canal_instance_sink_blocking_time{destination="example"}[2m]), 1000) / 10
-
sink:sink线程blocking时间占比,clamp_max(rate(canal_instance_publish_blocking_time{destination="example"}[2m]), 1000) / 10
-
出现下述情况,则原因可能为:
-
dump blocking ratio、sink blocking ratio占比高:client消费速度慢
-
dump blocking ratio占比高、sink blocking ratio占比低:canal server parser解析数据慢
-
-
-
TPS(MySQL transaction)
-
Canal instance处理transaction的TPS,以TRANSACTION_END事件为基准
-
-
TPS(Table row)
-
store put操作的tableRows TPS
-
client get操作的tableRows TPS
-
client ack操作的tableRows TPS
-
-
batch.size : 该值对kafka的producer的吞吐量有很大的影响,可以看到默认值是16384 bytes,可以通过一批消息通过较少的请求发送到broker,这样的话无论是对于client端还是server端,都会有性能提升。但是该值设置的过大的,肯能会对内存造成浪费,通常我恩还会设置一个缓冲区大小,也就是buffer.memory这个参数,所以生产环境还是需要根据实际情况进行合理的调整
-
linger.ms : 该参数配置batch.size使用,也就是说,如果配置了该参数以及上面的这个参数,那么无论你batch的大小是否达到size的大小,只要时间到该参数指定的值后,batch中的消息同样会被发送到broker,也就是说,linger.ms与batch.size二者只要其中一个条件满足,消息则会被发送到broker
更多推荐
所有评论(0)