RDD的本质

RDD的本质是一个函数,而RDD的变换不过是函数的嵌套.RDD有两类:

  1. 输入的RDD: 典型如KafkaRDD,JdbcRDD
  2. 转换的RDD: 如MapPartitionsRDD

 

RDD的处理流程:

以如下代码为例:

sc.textFile("abc.log").map().saveAsTextFile("")

1. textFile 会构建出一个NewHadoopRDD,

2. map函数运行后会构建出一个MapPartitionsRDD

3. saveAsTextFile触发了实际流程代码的执行

所以RDD不过是对一个函数的封装,当一个函数对数据处理完成后,我们就得到一个RDD的数据集(是一个虚拟的,后续会解释)。

 

NewHadoopRDD是数据来源,每个parition(分布式并行执行)负责获取数据,获得过程是通过iterator.next 获得一条一条记录的。假设某个时刻拿到了一条数据A,这个A会立刻被map里的函数处理得到B(完成了转换),然后开始写入到HDFS(一条一条写入)上。其他数据重复如此。所以整个过程:

  1. 理论上某个MapPartitionsRDD里实际在内存里的数据等于其Partition的数目,是个非常小的数值。
  2. NewHadoopRDD则会略多些,因为属于数据源,读取文件,假设读取文件的buffer是1M,那么最多也就是partitionNum*1M 数据在内存里
  3. saveAsTextFile也是一样的,往HDFS写文件,需要buffer,最多数据量为 buffer* partitionNum(可以汇聚到Driver端写,也可以各个Executor直接写入到HDFS)

 

所以整个过程其实是流式(一般是一条一条或者一批一批)的过程,一条数据被各个RDD所包裹的函数处理。

(Ps: 如果是mapPartition的话,那就是把整个partition的数据一起加载过来了,所以使用mapPartition函数比起map会容易造成内存溢出)

 

.map(...).map(...)..........map(...).map(...) 是嵌套调用, RDD compute 方法会调用上一个RDDcompute方法, 现在的rdd记住了自己的parent 然他们自己记住调用关系,带来的问题自然是不能嵌套太深

   对应的代码是RDD.scala中的iterator方法:

 

按上面的逻辑,内存使用其实是非常小的,10G内存跑100T数据也不是难事。但是为什么Spark常常因为内存问题挂掉呢? 我们接着往下看。

 

Shuffle的本质:

Stage是以shuffle作为分界的! Shuffle不过是偷偷的帮你加上了个类似saveAsLocalDiskFile的动作。

如果是M/R的话:

每个Stage其实就是上面说的那样,一套数据被N个嵌套的函数处理(也就是你的transform动作)。遇到了Shuffle,就被切开来。Shuffle本质上是把数据按规则临时都落到磁盘上,相当于完成了一个saveAsTextFile的动作,不过是存本地磁盘。然后被切开的下一个Stage则以本地磁盘的这些数据作为数据源,重新走上面描述的流程。

 

所以Spark的操作为:

前面我们提到,Shuffle不过是偷偷的帮你加上了个类似saveAsLocalDiskFile的动作。然而,写磁盘是一个高昂的动作。所以我们尽可能的把数据先放到内存,再批量写到文件里,还有读磁盘文件也是给费内存的动作。把数据放内存,就遇到个问题,比如10000条数据,到底会占用多少内存?这个其实很难预估的。所以一不小心,就容易导致内存溢出了。这其实也是一个很无奈的事情。

(还是说,Spark的shuffle的中间结果也是要写到本地磁盘的,只是顺序落盘和顺序读盘的话性能会快很多 -- 只写一次磁盘,而且是顺序写,那么也是非常快的。所以Spark其实是激进的使用内存)

(即进行第一次shuffle之后,数据就全部都放在内存中了? 还是都会写入到本地目录? 多次shuffle之间是如何操作的???

 对于以上的问题,后续会单独写一篇博客,理解下shuffle的逻辑)

 

Cache和Persist的含义:

    其实就是给某个Stage加上了一个saveAsMemoryBlockFile的动作,然后下次再要数据的时候,就不用算了。这些存在内存的数据就表示了某个RDD处理后的结果。这个才是说为啥Spark是内存计算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允许你把中间结果放内存里(如果不加这个的话,中间结果还是要落到磁盘上的?!)

    如果前一次stage操作完成之后,RDD没有进行cache之类的操作的话,那前一次的中间结果就会删除!

 

遗留的问题是:

  1. shuffle默认是要将数据落到磁盘的么?
  2. 落的时候是一条条落? – 还是说会合并?
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐