SparkStreaming程序优化小记
最近公司部署了一个sparkstreaming程序,主要逻辑是处理flume采集到kafka的数据,集群环境3个nodemanager,5核20G内存,刚开始测试阶段并没设置资源配置,直接丢在yarn上运行,每天的数据量大概2500万records。测试几天后发现数据处理时间延迟稍微长了一点,怀疑是程序处理数据的数据低于数据产生的数据,随着时间和数据的增加,这个时间延迟越来越大,遂决定对程序进行相
最近公司部署了一个sparkstreaming程序,主要逻辑是处理flume采集到kafka的数据,集群环境3个nodemanager,5核20G内存,刚开始测试阶段并没设置资源配置,直接丢在yarn上运行,每天的数据量大概2500万records。测试几天后发现数据处理时间延迟稍微长了一点,怀疑是程序处理数据的数据低于数据产生的数据,随着时间和数据的增加,这个时间延迟越来越大,遂决定对程序进行相关的优化,整个过程主要从下面几个方面进行了优化:
1、程序占用资源
3个节点,每个节点配置5cores,20G内存。
spark submit提交的时候,executor-memory设置为2G,每个节点启2个executor,3台节点一共启了5个executor,还有一个供 ApplicationMaster使用。
spark-submit
--master yarn
--conf spark.streaming.concurrentJobs=4
--class com.jiadun.handler.CP_GetWA_Source_1001FromKafka
--executor-memory 2G
--executor-cores 2
--num-executors 5
/data/test/GetDataFromKafka-1.0-SNAPSHOT.jar >/dev/null 2>/data/test/log/wa.log
2、设置程序拉取数据的大小
我启用的是spark的反应机制,动态的控制拉取数据的rate。
conf.set("spark.streaming.backpressure.enabled","true")
当然还可以从下面两种方式中进行控制,如果三种都设置的话,程序会比较它们的大小取最小值
conf.set("spark.streaming.receiver.maxRate","")
conf.set("spark.streaming.kafka.maxRatePerPartition","")
3、配置JVM GC的频率和时间
conf.set("spark.executor.extraJavaOptions","-XX:+UseConcMarkSweepGC")
4、batch interval
这个参数很重要,如果一个interval时间段内你拉取的batch没有处理结束,那么就会出现数据堆积,长此以往,数据会堆积的越来越多,这样的处理逻辑肯定有问题,所以适当的调节interval大小对于程序能不能稳定运行有很大的影响。一般设置在1-10s内,视具体情况而定
5、spark.streaming.concurrentJobs
这是决定你程序同时启动几个active job的参数,如果你的资源足够多,你可以在提交的任务的时候指定这个参数,此参数默认为1,我暂时设置项目的这个参数为4。
观察程序的运行状况 :
可以从sparkUI来观察,
上图中红框内记录了每个batch的执行耗时的折线图和柱状图,其中有个虚线stable,这是你的batch interval时间,如果大量的batch都在这条线以下的话,说明程序的处理速度是足够的,少量的超过是没有问题的。
当然对sparkstreaming程序的优化远不止于此,后期还需要学习更深层次的知识,这样才能更好的去适配各种运行环境,完成健壮性,稳定的项目
更多推荐
所有评论(0)