SparkStreaming直连kafka报错:requirement failed: Failed to get records for spark-executor-xxx topic_xxxx
依次排查后,确认是kafka服务端网络问题,因为sparkStreaming连接kafka走得是公网,数据量大网络不稳定是常有的情况,由于超过spark.streaming.kafka.consumer.poll.ms设置的120s时长,task没获取到对应的topic分区的数据,就会导致类似连接超时的异常,那这个task就fail掉了。:奇了怪了,这个任务之前一直在生产上跑的好好的,突然今天发现
现象:
sparkStreaming直连kafka处理数据发现偶尔有task失败,并且大量job在堆积,消息处理很慢,查看报错:
java.lang.IllegalArgumentException: requirement failed: Failed to get records for spark-executor-xxx topic_xxxx 602651 after polling for 120000
环境:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.3.0</version>
</dependency>
SparkSession.builder()
.config("spark.driver.memory", "512m")
.config("spark.driver.cores", "1")
.config("spark.executor.memory", "4g")
.config("spark.executor.cores", "4")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", "10")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.kafka.maxRatePerPartition", "10000")
.config("spark.streaming.backpressure.initialRate", "10000")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalogImplementation", "in-memory")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.shuffle.service.enabled", "true")
.config("spark.streaming.kafka.consumer.poll.ms", "120000")
.getOrCreate()
窗口时间:5S
分析过程:
奇了怪了,这个任务之前一直在生产上跑的好好的,突然今天发现大批量的task在pending中,而且每个批次的job执行的时间都超过了窗口时间。网上一查,有篇文章说修改:
减小spark.streaming.kafka.consumer.poll.ms参数到3000ms以内,即3秒超时就重试
将spark.task.maxFailures改为10,加大重试次数
这种修改就是就是会导致失败后kafka数据出现重复读取的情况,不符合Exactly once语义,明显就不适合我,我程序里面使用的是Direct模式直连kafka,重试次数默认是1,万一不懂的小白真的随意这么改,真的是害人不浅,对于我这种模式,能接受处理数据慢一点,但是不能接受少数据或者数据重复。
还有解决方案说将kafka的一个叫reconnect.backoff.ms的参数值设为0,这个修改也是扯淡,没看到对这个参数的详细说明就这么设置。
还有说是kafka client版本问题,我这个版本是spark-streaming-kafka-0-10_2.12包自带的2.8.1的版本,只要确保kafka client是2.4及以上的版本就行。
还有说是什么堆内堆外内存的,整的花里胡哨的,一般都是够的
还有说spark.streaming.concurrentJobs=2甚至更高的,并发提交运行job,这个性能是高了点,但是影响消息顺序,而且我程序里面每次都提交offset,这样offset会容易乱
算了,网上一堆大聪明误人子弟,还是自己分析排查,重点排查以下:
1,代码处理数据逻辑是否比较复杂导致process time长
2,,代码中是否有未关闭的流(mysql,redis,producer)及时close,是否获取流的时候是单例,广播变量,闭包,线程池等方式
3,下游落库慢导致process time长
4,executor服务器磁盘,网络,cpu是否正常,是否是所有失败的task集中在一台服务器
5,kafka服务器是否正常
6,排查executor内存,cpu是否足够
依次排查后,确认是kafka服务端网络问题,因为sparkStreaming连接kafka走得是公网,数据量大网络不稳定是常有的情况,由于超过spark.streaming.kafka.consumer.poll.ms设置的120s时长,task没获取到对应的topic分区的数据,就会导致类似连接超时的异常,那这个task就fail掉了。
解决方案:
kafka服务端我是没权限处理的,只能在sparkStreaming这头处理了
四个参数修改:
1,设置sparkStreaming窗口批次时间:5分钟
2,设置spark.streaming.kafka.consumer.poll.ms=600000
3,spark.streaming.kafka.maxRatePerPartition=200
4,spark.streaming.backpressure.initialRate=200
实际需要看程序处理消息的时间,不断去调试超时时间,最终找到一个合适的时间,窗口时间和spark.streaming.kafka.maxRatePerPartition还有spark.streaming.backpressure.initialRate是直接相关的,调大了窗口时间,这两个就要适当的缩小
至此再上线后解决问题,这种的话就是接受数据延迟五分钟,不过没办法,对于kafka服务端不稳定的问题没有权限解决,只能苦一苦下游系统了
更多推荐
所有评论(0)