Spark核心知识,参数配置,内存优化,常见问题大全
Spark基础篇1、Spark有哪两种算子?2、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?3、如何从Kafka中获取数据?4、RDD创建有哪几种方式?5、Spark并行度怎么设置比较合适?6、Spark如何处理不能被序列化的对象?7、collect功能是什么,其底层是怎么实现的?8、为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么
Spark核心知识,参数配置,内存优化,常见问题大全
- 【spark优化】spark程序开发调优,实际开发注意事项
- 【spark优化】spark资源参数调优,各资源参数生产调优
- 【spark优化】Spark Shuffle配置调优,生产shuffle参数调优
- 【spark优化】spark数据倾斜调优,各场景实践,方案优劣势
- 1、Spark技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?
- 2、Spark中standalone模式特点,有哪些优点和缺点?
- 3、如何理解Standalone模式下,Spark资源分配是粗粒度的?
- 4、Spark RDD 和 MapReduce2的区别?
- 5、spark和Mapreduce快? 为什么快呢? 快在哪里呢?
- 6、Spark为什么比mapreduce快?
- 7、Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别?
- 8、Spark on Yarn 模式有哪些优点?
- 9、Spark on Mesos中,什么是粗粒度分配,什么是细粒度分配,优点和缺点是什么?
- 10、spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一个进程么?
- 11、运行在yarn中Application有几种类型的container?
- 12、Yarn中的container是由谁负责销毁的,在Hadoop Mapreduce中container可以复用么?
- 13、描述Yarn执行一个任务的过程?
- 14、spark的有几种部署模式,每种模式特点?
- 15、spark工作机制?
- 16、spark的优化怎么做?
- 17、Spark sql为什么比hive快呢?
- 18、谈谈你对container的理解?
- 19、Spark应用程序的执行过程是什么?
- 20、提交任务时,如何指定Spark Application的运行模式?
- 21、不启动Spark集群Master和work服务,可不可以运行Spark程序?
- 22、driver的功能是什么?
- 23、Spark中Worker的主要工作是什么?
- 24、为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么问题发生?
- 25、Executor启动时,资源通过哪几个参数指定?
- 26、一个task的map数量由谁来决定?
- 27、 谈谈你对Hadoop生态的认识。
- 28、RDD创建有哪几种方式?
- 29、RDD的弹性表现在哪几点?
- 30、RDD有哪些缺陷?
- 31、什么是RDD宽依赖和窄依赖?
- 32、RDD的数据结构是怎么样的?
- 33、RDD算子里操作一个外部map,比如往里面put数据,然后算子外再遍历map,会有什么问题吗?
- 34、union操作是产生宽依赖还是窄依赖?
- 35、窄依赖父RDD的partition和子RDD的parition是不是都是一对一的关系?
- 36、 RDD通过Linage(记录数据更新)的方式为何很高效?
- 37、什么是shuffle,以及为什么需要shuffle?
- 38、简单说一下hadoop和spark的shuffle相同和差异?
- 39、Spark的shuffle过程?
- 40、不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
- 41、Sort-based shuffle的缺陷?
- 42、Spark有哪两种算子?
- 43、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?
- 44、map与flatMap的区别?
- 45、reduceByKey是不是action?
- 46、collect功能是什么,其底层是怎么实现的?
- 47、为什么要进行序列化序列化?
- 48、Spark如何处理不能被序列化的对象?
- 49、cache和pesist的区别?
- 50、 cache后面能不能接其他算子,它是不是action操作?
- 51、Spark为什么要持久化,一般什么场景下要进行persist操作?
- 52、列出你所知道的调度器,说明其工作原理?
- 53、FIFO调度模式的基本原理、优点和缺点?
- 54、FAIR调度模式的优点和缺点?
- 55、CAPCACITY调度模式的优点和缺点?
- 56、spark hashParitioner的弊端是什么?
- 57、RangePartitioner分区的原理?
- 58、Spark中的HashShufle的有哪些不足?
- 59、 conslidate是如何优化Hash shuffle时在map端产生的小文件?
- 60、介绍parition和block有什么关联关系?
- 61、hbase region多大会分区,spark读取hbase数据是如何划分partition的?
- 62、spark.default.parallelism这个参数有什么意义,实际生产中如何设置?
- 63、spark.shuffle.memoryFraction参数的含义,以及优化经验?
- 64、spark.storage.memoryFraction参数的含义,实际生产中如何调优?
- 65、常见的数压缩方式,你们生产集群采用了什么压缩方式,提升了多少效率?
- 66、Spark使用parquet文件存储格式能带来哪些好处?
- 67、如何从Kafka中获取数据?
- 68、导致Executor产生FULL gc的原因,可能导致什么问题?
- 69、Spark累加器有哪些特点?
- 70、Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?
- 71、Spark并行度怎么设置比较合适?
- 72、数据本地性是在哪个环节确定的?
- 73、 Spark的数据本地性有哪几种?
- 74、介绍一下join操作优化经验?
- 75、介绍一下你对Unified Memory Management内存管理模型的理解?
【spark优化】spark程序开发调优,实际开发注意事项
【spark优化】spark资源参数调优,各资源参数生产调优
【spark优化】Spark Shuffle配置调优,生产shuffle参数调优
Spark Shuffle配置调优,生产shuffle参数调优
【spark优化】spark数据倾斜调优,各场景实践,方案优劣势
1、Spark技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?
可以画一个这样的技术栈图先,然后分别解释下每个组件的功能和场景
1)Spark core
:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。
2)SparkStreaming
:是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。
3)Spark sql
:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。
4)BlinkDB
:是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
5)MLBase
:是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。
MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。
6)GraphX
:是Spark中用于图和图并行计算。
2、Spark中standalone模式特点,有哪些优点和缺点?
1)特点:
-
standalone是master/slave架构,集群由Master与Worker节点组成,程序通过与Master节点交互申请资源,Worker节点启动Executor运行;
-
standalone调度模式使用FIFO调度方式;
-
无依赖任何其他资源管理系统,Master负责管理集群资源。
2)优点:
-
部署简单;
-
不依赖其他资源管理系统。
3)缺点:
-
默认每个应用程序会独占所有可用节点的资源,当然可以通过spark.cores.max来决定一个应用可以申请的CPU cores个数;
-
可能有单点故障,需要自己配置master HA。
3、如何理解Standalone模式下,Spark资源分配是粗粒度的?
spark默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候使用分配好的资源,除非资源出现了故障才会重新分配。
比如Spark shell启动,已提交,一注册,哪怕没有任务,worker都会分配资源给executor。
4、Spark RDD 和 MapReduce2的区别?
1)mr2只有2个阶段,数据需要大量访问磁盘,数据来源相对单一 ,spark RDD,可以无数个阶段进行迭代计算,数据来源非常丰富,数据落地介质也非常丰富spark计算基于内存;
2)MapReduce2需要频繁操作磁盘IO,需要明确的是如果是SparkRDD的话,你要知道每一种数据来源对应的是什么,RDD从数据源加载数据,将数据放到不同的partition,针对这些partition中的数据进行迭代式计算,计算完成之后,落地到不同的介质当中。
5、spark和Mapreduce快? 为什么快呢? 快在哪里呢?
Spark更加快的主要原因有几点:
1)基于内存计算,减少低效的磁盘交互;
2)高效的调度算法,基于DAG;
3)容错机制 Lineage,主要是DAG和Linage,即使spark不使用内存技术,也大大快于mapreduce。
6、Spark为什么比mapreduce快?
1)基于内存计算,减少低效的磁盘交互;
2)高效的调度算法,基于DAG;
3)容错机制Linage,精华部分就是DAG和Lingae
7、Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别?
两者都是用MR模型
来进行并行计算:
1)hadoop的一个作业称为job,job里面分为map task
和reduce task
,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
2)spark用户提交的任务成为application
,一个application对应一个SparkContext
,app中存在多个job,每触发一次action操作就会产生一个job。
这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。
8、Spark on Yarn 模式有哪些优点?
1)与其他计算框架共享集群资源(Spark框架与MapReduce框架同时运行,如果不用Yarn进行资源分配,MapReduce分到的内存资源会很少,效率低下);资源按需分配,进而提高集群资源利用等。
2)相较于Spark自带的Standalone模式,Yarn的资源分配更加细致。
3)Application部署简化,例如Spark,Storm等多种框架的应用由客户端提交后,由Yarn负责资源的管理和调度,利用Container作为资源隔离的单位,以它为单位去使用内存,cpu等。
4)Yarn通过队列的方式,管理同时运行在Yarn集群中的多个服务,可根据不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。
9、Spark on Mesos中,什么是粗粒度分配,什么是细粒度分配,优点和缺点是什么?
1)粗粒度:启动时就分配好资源, 程序启动,后续具体使用就使用分配好的资源,不需要再分配资源;
优点
:作业特别多时,资源复用率高,适合粗粒度;
缺点
:容易资源浪费,假如一个job有1000个task,完成了999个,还有一个没完成,那么使用粗粒度,999个资源就会闲置在那里,资源浪费。
2)细粒度:用资源的时候分配,用完了就立即回收资源,启动会麻烦一点,启动一次分配一次,会比较麻烦。
10、spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一个进程么?
是,driver 位于ApplicationMaster进程
中。该进程负责申请资源,还负责监控程序、资源的动态情况。
11、运行在yarn中Application有几种类型的container?
1)运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源;
2)运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。
12、Yarn中的container是由谁负责销毁的,在Hadoop Mapreduce中container可以复用么?
ApplicationMaster负责销毁,在Hadoop Mapreduce不可以复用,在spark on yarn程序container可以复用。
13、描述Yarn执行一个任务的过程?
1)客户端client向ResouceManager提交Application,ResouceManager接受Application并根据集群资源状况选取一个node来启动Application的任务调度器driver(ApplicationMaster)
。
2)ResouceManager找到那个node,命令其该node上的nodeManager来启动一个新的 JVM进程运行程序的driver(ApplicationMaster)
部分,driver(ApplicationMaster)
启动时会首先向ResourceManager注册,说明由自己来负责当前程序的运行。
3)driver(ApplicationMaster)开始下载相关jar包等各种资源,基于下载的jar等信息决定向ResourceManager
申请具体的资源内容。
4)ResouceManager接受到driver(ApplicationMaster)提出的申请后,会最大化的满足资源分配请求,并发送资源的元数据信息给driver(ApplicationMaster)
。
5)driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元数据信息发指令给具体机器上的NodeManager,让其启动具体的container。
6)NodeManager收到driver发来的指令,启动container,container启动后必须向driver(ApplicationMaster)注册。
7)driver(ApplicationMaster)收到container的注册,开始进行任务的调度和计算,直到 任务完成。
注意:如果ResourceManager第一次没有能够满足driver(ApplicationMaster)的资源请求 ,后续发现有空闲的资源,会主动向driver(ApplicationMaster)发送可用资源的元数据信息以提供更多的资源用于当前程序的运行。
14、spark的有几种部署模式,每种模式特点?
1)本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
-
local
:只启动一个executor -
local[n]
:启动n个executor -
local[*]
:启动跟cpu数目相同的 executor
2)Standalone模式
分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础。
3)Spark On Yarn模式
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster
和client
运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。
4)Spark On Mesos模式
官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
粗粒度模式(Coarse-grained Mode)
:每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot
”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。细粒度模式(Fine-grained Mode)
:鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。
15、spark工作机制?
① 构建Application的运行环境,Driver创建一个SparkContext
② SparkContext向资源管理器(Standalone、Mesos、Yarn)申请Executor资源,资源管理器启动StandaloneExecutorbackend
(Executor)
③ Executor向SparkContext申请Task
④ SparkContext将应用程序分发给Executor
⑤ SparkContext就建成DAG图,DAGScheduler
将DAG图解析成Stage
,每个Stage有多个task
,形成taskset
发送给task Scheduler
,由task Scheduler
将Task发送给Executor运行
⑥ Task在Executor上运行,运行完释放所有资源
16、spark的优化怎么做?
spark调优比较复杂,但是大体可以分为三个方面来进行
1)平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet
2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等
3)JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
17、Spark sql为什么比hive快呢?
计算引擎不一样,一个是spark计算模型,一个是mapreudce计算模型。
18、谈谈你对container的理解?
1)Container
作为资源分配和调度的基本单位,其中封装了的资源如内存,CPU,磁盘,网络带宽等。 目前yarn
仅仅封装内存和CPU
2)Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster
3)Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令
19、Spark应用程序的执行过程是什么?
1)构建Spark Application的运行环境(启动SparkContext
),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2)资源管理器分配Executor资源并启动StandaloneExecutorBackend
,Executor运行情况将随着心跳发送到资源管理器上;
3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor;
4)Task在Executor上运行,运行完毕释放所有资源。
20、提交任务时,如何指定Spark Application的运行模式?
1)cluster模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode cluster xx.jar
2)client模式:./spark-submit --class xx.xx.xx --master yarn --deploy-mode client xx.jar
21、不启动Spark集群Master和work服务,可不可以运行Spark程序?
可以,只要资源管理器第三方管理就可以,如由yarn管理,spark集群不启动也可以使用spark;spark集群启动的是work和master,这个其实就是资源管理框架,
yarn中的resourceManager相当于master,NodeManager相当于worker,做计算是Executor,和spark集群的work和manager可以没关系,归根接底还是JVM的运行,只要所在的JVM上安装了spark就可以。
22、driver的功能是什么?
1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点;
2)功能:负责向集群申请资源,向master注册信息,负责job作业的调度,作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler
,TaskScheduler
。
23、Spark中Worker的主要工作是什么?
主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。
需要注意的是:
1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。
2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。
24、为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么问题发生?
会导致执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源,分配了部分Executor,该job就开始执行task,应该是task的调度线程和Executor资源申请是异步的;
如果想等待申请完所有的资源再执行job的:需要将spark.scheduler.maxRegisteredResourcesWaitingTime
设置的很大;spark.scheduler.minRegisteredResourcesRatio
设置为1,但是应该结合实际考虑
否则很容易出现长时间分配不到资源,job一直不能运行的情况。
25、Executor启动时,资源通过哪几个参数指定?
1)num-executors
:是executor的数量
2)executor-memory
:是每个executor使用的内存
3)executor-cores
:是每个executor分配的CPU
26、一个task的map数量由谁来决定?
一般情况下,在输入源是文件的时候,一个task的map数量由splitSize来决定的,那么splitSize是由以下几个来决定的
goalSize = totalSize / mapred.map.tasks
inSize = max {mapred.min.split.size, minSplitSize}
splitSize = max (minSize, min(goalSize, dfs.block.size))
一个task的reduce
数量,由partition决定。
27、 谈谈你对Hadoop生态的认识。
hadoop生态主要分为三大类型,1)分布式文件系统,2)分布式计算引擎,3)周边工具
-
分布式系统:HDFS,hbase
-
分布式计算引擎:Spark,MapReduce
-
周边工具:如zookeeper,pig,hive,oozie,sqoop,ranger,kafka等
28、RDD创建有哪几种方式?
1)使用程序中的集合创建rdd
2)使用本地文件系统创建rdd
3)使用hdfs创建rdd
4)基于数据库db创建rdd
5)基于Nosql创建rdd,如hbase
6)基于s3创建rdd
7)基于数据流,如socket创建rdd
29、RDD的弹性表现在哪几点?
1)自动的进行内存和磁盘的存储切换;
2)基于Lineage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG task调度和资源无关;
7)数据分片的高度弹性。
30、RDD有哪些缺陷?
1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的。所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读。
2)不支持增量迭代计算,Flink支持
31、什么是RDD宽依赖和窄依赖?
RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)
和宽依赖(wide dependency)
1)窄依赖:指的是一个父RDD的Partition只会被一个子RDD的Partition使用(一父对一子
)
2)宽依赖:指的是一个父RDD的Partition会被多个子RDD的Partition使用(一父对多子
)
32、RDD的数据结构是怎么样的?
一个RDD对象,包含如下5个核心属性。
1)一个分区列表,每个分区里是RDD的部分数据(或称数据块)。
2)一个依赖列表,存储依赖的其他RDD。
3)一个名为compute的计算函数,用于计算RDD各分区的值。
4)分区器(可选),用于键/值类型的RDD,比如某个RDD是按散列来分区。
5)计算各分区时优先的位置列表(可选),比如从HDFS上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。
33、RDD算子里操作一个外部map,比如往里面put数据,然后算子外再遍历map,会有什么问题吗?
频繁创建额外对象,容易oom。
34、union操作是产生宽依赖还是窄依赖?
产生窄依赖。
35、窄依赖父RDD的partition和子RDD的parition是不是都是一对一的关系?
不一定,除了一对一的窄依赖,还包含一对固定个数的窄依赖(就是对父RDD的依赖的Partition的数量不会随着RDD数量规模的改变而改变),
比如join操作的每个partiion仅仅和已知的partition进行join,这个join操作是窄依赖,依赖固定数量的父rdd,因为是确定的partition关系。
36、 RDD通过Linage(记录数据更新)的方式为何很高效?
1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且RDD之间构成了链条,lazy是弹性的基石。
由于RDD不可变,所以每次操作就产生新的rdd,不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯 900步是上一个stage的结束,要么就checkpoint。
2)记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说rdd是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合,写或者修改操作,都是基于集合的rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景如网络爬虫,现实世界中,大多数写是粗粒度的场景。
37、什么是shuffle,以及为什么需要shuffle?
shuffle中文翻译为洗牌,需要shuffle的原因是:某种具有共同特征的数据汇聚到一个计算节点上进行计算。
38、简单说一下hadoop和spark的shuffle相同和差异?
1)从表现层的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask
)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask
,也可能是 ResultTask
)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce()
(Spark 里可能是后续的一系列操作)。
2)从源码的角度来看,两者差别不小。Hadoop的MapReduce 是 sort-based
,进入 combine()
和 reduce()
的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并
)。
目前的 Spark 默认选择的是 hash-based
,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作。
3)从实现角度来看,两者也有不少差别。 Hadoop的 MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce()
等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。
在Spark中,没有这样功能明确的阶段,只有不同的 stage
和一系列的 transformation()
,所以 spill, merge, aggregate
等操作需要蕴含在 transformation()
中。
如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write
,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read
。
那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write
和 shuffle read
的处理逻辑?以及两个处理逻辑应该怎么高效实现?
- Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。
39、Spark的shuffle过程?
从下面三点去展开
1)shuffle过程的划分
2)shuffle的中间结果如何存储
3)shuffle的数据如何拉取过来
40、不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
不一定,当数据规模小,Hash shuffle
快于Sorted Shuffle
;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。
41、Sort-based shuffle的缺陷?
1)如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃。
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序。
42、Spark有哪两种算子?
Transformation(转化)
算子和Action(执行、行动)
算子。
43、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?
在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。
这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
44、map与flatMap的区别?
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象。
flatMap:对RDD每个元素转换,然后再扁平化。将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为null
的值。
45、reduceByKey是不是action?
不是,很多人都会以为是action,reduce
才是action
46、collect功能是什么,其底层是怎么实现的?
driver
通过collect
把集群中各个节点的内容收集过来汇总成结果,collect把各个节点上的数据抓过来,抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
47、为什么要进行序列化序列化?
可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU。
48、Spark如何处理不能被序列化的对象?
将不能序列化的内容封装成object。
49、cache和pesist的区别?
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间
1) cache只有一个默认的缓存级别MEMORY_ONLY
,cache调用了persist
,而persist可以根据情况设置其它的缓存级别;
2)executor执行的时候,默认60%做cache,40%做task操作,persist是最根本的函数,最底层的函数。
50、 cache后面能不能接其他算子,它是不是action操作?
cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache。
cache不是action操作。
51、Spark为什么要持久化,一般什么场景下要进行persist操作?
spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。
以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化
2)计算链条非常长,重新恢复要算很多步骤,很好使,persist
3)checkpoint
所在的rdd要持久化persist。checkpoint前,要持久化,写个rdd.cache
或者rdd.persist
,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint
之前一定会进行persist
。
4)shuffle之后要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。
52、列出你所知道的调度器,说明其工作原理?
1)FiFo schedular
默认的调度器 先进先出
2)Capacity schedular
容量调度器,选择占用内存小 优先级高的
3)Fair schedular
公平调度器,所有job占用相同资源
53、FIFO调度模式的基本原理、优点和缺点?
基本原理:按照先后顺序决定资源的使用,资源优先满足最先来的job。第一个job优先获取所有可用的资源,接下来第二个job再获取剩余资源。
以此类推,如果第一个job没有占用所有的资源,那么第二个job还可以继续获取剩余资源,这样多个job可以并行运行,如果第一个job很大,占用所有资源,则第二job就需要等待,等到第一个job释放所有资源。
优点和缺点:
-
适合长作业,不适合短作业;
-
适合CPU繁忙型作业(计算时间长,相当于长作业),不利于IO繁忙型作业(计算时间短,相当于短作业)。
54、FAIR调度模式的优点和缺点?
所有的任务拥有大致相当的优先级来共享集群资源,spark多以轮训的方式为任务分配资源,不管长任务还是端任务都可以获得资源,并且获得不错的响应时间,对于短任务,不会像FIFO那样等待较长时间了,通过参数spark.scheduler.mode
为FAIR指定。
55、CAPCACITY调度模式的优点和缺点?
1)原理:
容量调度器支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定。
调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值(即比较空闲的队列),选择一个该比值最小的队列;
然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,同时考虑用户资源量限制和内存限制
2)优点:
-
计算能力保证。支持多个队列,某个作业可被提交到某一个队列中。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业共享该队列中的资源;
-
灵活性。空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源资源,便会分配给他们;
-
支持优先级。队列支持作业优先级调度(默认是FIFO);
-
多重租赁。综合考虑多种约束防止单个作业、用户或者队列独占队列或者集群中的资源;
-
基于资源的调度。 支持资源密集型作业,允许作业使用的资源量高于默认值,进而可容纳不同资源需求的作业。不过,当前仅支持内存资源的调度。
56、spark hashParitioner的弊端是什么?
HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID;
弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据。
57、RangePartitioner分区的原理?
RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。
简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。
RangePartitioner作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds
。
58、Spark中的HashShufle的有哪些不足?
1)shuffle产生海量的小文件在磁盘上,此时会产生大量耗时的、低效的IO操作;
2)容易导致内存不够用,由于内存需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较大的话,容易出现OOM;
3)容易出现数据倾斜,导致OOM。
59、 conslidate是如何优化Hash shuffle时在map端产生的小文件?
1)conslidate为了解决Hash Shuffle同时打开过多文件,导致Writer handler内存使用过大以及产生过多文件,导致大量的随机读写带来的低效磁盘IO;
2)conslidate根据CPU的个数来决定每个task shuffle map
端产生多少个文件,
假设原来有10个task,100个reduce,每个CPU有10个CPU,那么使用hash shuffle会产生10*100=1000
个文件,conslidate产生10*10=100
个文件
注意:conslidate部分减少了文件和文件句柄,并行读很高的情况下(task很多时)还是会很多文件。
60、介绍parition和block有什么关联关系?
1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容;
2)Spark中的partion
是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partion
大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;
3)block位于存储空间、partion位于计算空间,block的大小是固定的、partion大小是不固定的,是从2个不同的角度去看数据。
61、hbase region多大会分区,spark读取hbase数据是如何划分partition的?
region超过了hbase.hregion.max.filesize
这个参数配置的大小就会自动裂分,默认值是1G。
默认情况下,hbase有多少个region,Spark读取时就会有多少个partition 。
62、spark.default.parallelism这个参数有什么意义,实际生产中如何设置?
1)参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能;
2)很多人都不会设置这个参数,会使得集群非常低效,你的cpu,内存再多,如果task始终为1,那也是浪费;
spark官网建议task个数为 ≈ CPU的核数*executor的个数的2~3倍
。
63、spark.shuffle.memoryFraction参数的含义,以及优化经验?
1)spark.shuffle.memoryFraction
是shuffle调优中重要参数,shuffle从上一个task拉取数据过来,要在Executor进行聚合操作,聚合操作时使用Executor内存的比例由该参数决定,默认是20%
,如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;
2)如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,
避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。
此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
64、spark.storage.memoryFraction参数的含义,实际生产中如何调优?
1)用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6
,默认Executor 60%的内存
,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘;
2)如果持久化操作比较多,可以提高spark.storage.memoryFraction
参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果shuffle的操作比较多,有很多的数据读写操作到JVM中,那么应该调小一点,节约出更多的内存给JVM,避免过多的JVM gc发生。在web ui中观察如果发现gc时间很长,可以设置spark.storage.memoryFraction
更小一点。
65、常见的数压缩方式,你们生产集群采用了什么压缩方式,提升了多少效率?
1)数据压缩,大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,可以使用适当的压缩算法。
数组,对象序列化后都可以使用压缩,数更紧凑,减少空间开销。常见的压缩方式有snappy,LZO,gz等
2)Hadoop生产环境常用的是snappy压缩方式(使用压缩,实际上是CPU换IO吞吐量和磁盘空间,所以如果CPU利用率不高,不忙的情况下,可以大大提升集群处理效率)。
snappy压缩比一般20%~30%之间,并且压缩和解压缩效率也非常高(参考数据如下):
66、Spark使用parquet文件存储格式能带来哪些好处?
1)如果说HDFS是大数据时代分布式文件系统首选标准,那么parquet
则是整个大数据时代文件存储格式实时首选标准,Spark SQL默认的存储格式就是parquet
。
2)速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况下,使用parquet很多时候可以成功运行。
3)parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作(例如会导致lost task,lost executor)但是此时如果使用parquet就可以正常的完成。
4)极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理数据的时候的数据输入内容,尤其是在spark1.6x中有个下推过滤器在一些情况下可以极大的减少磁盘的IO和内存的占用,(下推过滤器)。
5)spark 1.6x parquet方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu消耗。
6)采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径。
67、如何从Kafka中获取数据?
1)基于Receiver
的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存
中的,然后Spark Streaming启动的job会去处理那些数据。
2)基于Direct
的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地
查询Kafka,来获得每个topic+partition的最新的offset
,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来
获取Kafka指定offset范围的数据。
68、导致Executor产生FULL gc的原因,可能导致什么问题?
可能导致Executor僵死问题,海量数据的shuffle
和数据倾斜
等都可能导致full gc
。
以shuffle为例,伴随着大量的Shuffle写操作,JVM的新生代不断GC,Eden Space写满了就往Survivor Space写,同时超过一定大小的数据会直接写到老生代,当新生代写满了之后,也会把老的数据搞到老生代,如果老生代空间不足了,
就触发FULL GC,还是空间不够,那就OOM错误了,此时线程被Blocked,导致整个Executor处理数据的进程被卡住。
69、Spark累加器有哪些特点?
1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态;
2)在executor中修改它,在driver读取;
3)executor级别共享的,广播变量是task级别的共享,两个application不可以共享累加器,但是同一个application不同的job可以共享。
70、Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?
相当于spark中的map算子和reduceByKey算子,当然还是有点区别的,MR会自动进行排序的,spark要看你用的是什么partitioner。
71、Spark并行度怎么设置比较合适?
spark并行度,每个core承载2~4
个partition,如 32个core,那么64~128
之间的并行度,也就是设置64~128个partion,并行读和数据规模无关,只和内存使用量和cpu使用时间有关。
72、数据本地性是在哪个环节确定的?
具体的task运行在那他机器上,dag划分stage的时候确定的
73、 Spark的数据本地性有哪几种?
Spark中的数据本地性有三种:
1)PROCESS_LOCAL
:是指读取缓存在本地节点的数据
2)NODE_LOCAL
:是指读取本地节点硬盘数据
3)ANY
:是指读取非本地节点数据
通常读取数据PROCESS_LOCAL > NODE_LOCAL > ANY
,尽量使数据以PROCESS_LOCAL
或NODE_LOCAL
方式读取。其中PROCESS_LOCAL
还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。
74、介绍一下join操作优化经验?
join其实常见的就分为两类: map-side join
和 reduce-side join
。
当大表和小表join时,用map-side join
能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join
。
如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。
75、介绍一下你对Unified Memory Management内存管理模型的理解?
Spark中的内存使用分为两部分:执行(execution)
与存储(storage)
。
执行内存主要用于shuffles、joins、sorts和aggregations
,存储内存则用于缓存
或者跨节点的内部数据传输
。1.6之前,对于一个Executor,内存都由以下部分构成:
1)ExecutionMemory
。这片内存区域是为了解决 shuffles,joins, sorts,aggregations
过程中为了避免频繁IO需要的buffer。 通过spark.shuffle.memoryFraction(默认 0.2)
配置。
2)StorageMemory
。这片内存区域是为了解决 block cache(就是你显示调用rdd.cache, rdd.persist等方法), 还有就是broadcasts,以及task results的存储。可以通过参数 spark.storage.memoryFraction(默认0.6)设置。
3)OtherMemory
。给系统预留的,因为程序本身运行也是需要内存的(默认为0.2)。
传统内存管理的不足:
1)Shuffle占用内存0.2*0.8
,内存分配这么少,可能会将数据spill
到磁盘,频繁的磁盘IO是很大的负担,Storage内存占用0.6,主要是为了迭代处理。传统的Spark内存分配对操作人的要求非常高。(Shuffle分配内存:ShuffleMemoryManager, TaskMemoryManager, ExecutorMemoryManager
)一个Task获得全部的Execution的Memory,其他Task过来就没有内存了,只能等待;
2)默认情况下,Task在线程中可能会占满整个内存,分片数据
更多推荐
所有评论(0)