王家林Spark笔记
第一讲:Scala光速入门
本期内容
1、Scala的重大价值
2、Scala基础语法入门实战
3、Scala函数入门实战
4、Scala中Array、Map、Tuple实战
5、综合案例及Spark源码解析


kafka 消息中间件
val name:String = null
import scala.math._
min(20,4)
Array(1,2,3,4)
val array = Array(1,2,3,4)
array
val array = Array.apply(1,2,3,4)
array


val age:Int = 0
val name:String = null
val age1,age2,age3 = 0


1+1
1.+(1)


import scala.math._
min(20,4)


if(age >= 18) "adult"


val:不可变变量
var:可变变量
一般情况下用val


val result = if(age>=18){}
val age = 19
res9.toInt
if(age >= 18) "adult" else "child"
val result = if(age >=18) "adult" else "child"


val result = if(age >=18){
    "adult"
    buffered = 10
    buffered
}
println("Spark")
print("\nSpark")
printf(" %s is the future of Big Data Computation Framework.\n","Spark") 格式占位符


-----------输入--读取内容:---------------------
readLine(" Please enter your password : ")
readInt
----------------------------------------------
-------循环----------------
while(element > 10){
println(element)
element -= 1
}


0 to element
for(i <- 0 to element) println(i)
for(i<-0 to element if i%2 ==0){println(i)}
for(i <- 0 to element if i%2 ==0){println(i)}
import scala.util.control.Breaks._
for(i <- 1 to 10){
if(i == 4) break
}


val n = 10
def f1:Int = {
for(i <- 1 to 20)
{
if(i == n) return 9
println(i)
}
}


val n = 10
def f1:Any = {
  for(i<- 1 to 10){
    if(i==0) return i
    println(i)
  }
}


println _
def f2 = println _


import scala.io.Source._
import scala.io._
try{
  val content = fromFile("/root/1.scala").mkString
}catch{
  case _:FileNotFoundException => println("Oops!!!File not found")
}finally{
  println("Byebye world!")
}


val arr = new Array[Int](5)
val arr1 = Array("scala","spark")


import scala.collection.mutable.ArrayBuffer
val arrBuffer = ArrayBuffer[Int]()
arrBuffer += 10
arrBuffer
arrBuffer += (12,13,14,15,16,17)
arrBuffer ++= Array(1,2,3,4)


arrBuffer.trimEnd
arrBuffer.trimStart
arrBuffer.insert(5,100) 从指定位置插入数据
arrBuffer.insert(7,1100,200,300,400,500,600)
arrBuffer.remove(10) 指定位置进行移除
arrBuffer.toArray


val arr2 = arrBuffer.toArray
arr2.toBuffer


for (elem <- arr2) println(elem)
arr2


for(i<-0 until(arr2.length,1))println(arr2(i))
for(i<-0 until(arr2.length,2))println(arr2(i))
for(i<-(0 until arr2.length).reverse) println(arr2(i))
arr2.sum
arr2.max
scala.until.Sorting.quickSort(arr2)
scala.util.Sorting.quickSort(arr2)
arr2.mkString
arr2.mkString(", ")
val arr3 = for(i <- arr2) yield i*i
val arr3 = for(i <- arr2 if i % 3 == 0) yield i*i
arr2.filter(_%3 ==0).map(i => i*i)
arr2.filter{_%3 == 0}.map{i => i*i}




def f3(param1:String,param2:Int =30) = param1 + param2
f3("Spark")
f3(param2=100,param1="Scala")
def sum(numbers: Int*) = {var result = 0;for(element <- numbers) result += element;result}
sum(1,2,3,4,5,6,7,8,9,10)
sum(1 to 100: _*)
def morning(content:String) = "Good" + content
def morning(content:String):Unit = "Good" + content






import scala.io.
val arr3 = for(i <- arr2) yield i * i
val arr3 = for(i <- arr2 if i % 3 == 0) yield i*i
val person = scala.collection.immutable.SortedMap("Spark" -> 6,"Hadoop" -> 11)
val persons = Map("Spark" ->6,"Hadoop" -> 11)


for(elem <- arr2) println(elem)


作业一:移除一个数组中第一个负数后的所有负数
val persons = scala.collection.mutable.Map("Spark"->6,"Hadoop"->11)
persons += ("Flink" -> 5)
persons -= "Flink"
val sparkValue = if(persons.contains("Spark")) persons("Spark") else 1000
val sparkValue = persons.getOrElse("Spark",1000)
for((key,value) <- persons) println(key + " : " + value)
for(key <- persons.keySet) println(key + ":")


val persons = scala.collection.immutable.SortedMap("Spark"->6,"Hadoop"->11)
val tuple = ("Spark",6,99.0)
tuple._1
tuple._2
--------------------------------------------
第二节
class HiScala{
  private var name = "Spark"
  def sayName(){println(name)}
  def getName = name
}
val scal  = new Hiscala
scal.sayName
scal.getName




------------------------------------
第三节函数式编程
def fun1(name:String){println(name)}
val fun1_v =fun1 _
fun1("Spark")
fun1_v("Spark")
fun1_v("Spark")
val fun2 = (content:String)=>println(content)




val xm="西门大官人"
val jl="金莲"
def makelove(status:Int){
  if (status == 1) println ("雄风再起")
  else println("偃旗息鼓") 
  println("金莲鄙视地说:还以为你真的那么强呢,不嗑药和大郎也差不多少啊!")
  println("西门大官人骂道:你个贱货!我不是早和你说过,老爷我今天有些感冒吗……")
}
makelove(0)






高阶函数
val xm="西门大官人"
val jl="金莲"


val say = (content:String) => println(content)


def makelove(func:(String)=>Unit,status:Int){
  if (status == 1) println ("雄风再起")
  else println("偃旗息鼓") 
  func(jl+"鄙视地说:还以为你真的那么强呢,不嗑药和大郎也差不多少啊\n"+xm+"骂道:你个贱货!我不是早和你说过,老爷我今天有些感冒吗……")
}
makelove(say,0)




abstract class Love(val man:String,val woman:String){
    def make;
}


class goodLove(man:String,woman:String) extends Love(man,woman){
    def make={
       println(s"$man:Let me drink something,....")
       println(s"$woman:  Great!")
       println(" .....wa,wa,....,(about 30000 words are omitted).....");
       println(s"$woman: Your knife is sharp as many years ago....");


    }
}


class badLove (man:String,woman:String) extends Love(man,woman){
    def make={
       println(s" $woman:Let me drink something,....")
       println(s" $man: That's ok,but I have forgotten something......")
       println(" .....wa,wa,....,(about 30 words are omitted).....");


       println(s"$woman: why you stop?!");
       println(s"$man: 你个贱人,爷今儿个感冒了,你不知道啊?!!");


    }
}


def main(status:Int){
  if(status == 1)
    new goodLove("西门庆","潘金莲").make
  else
    new badLove("西门庆","潘金莲").make


}
main(0)


------------------------
val hiScala = (content:String) => println(content)


def bigData(func:(String)=>Unit,content:String){
  func(content)
}
bigData(hiScala,"Spark")
--------------------------------
array.map(item =>2 * item)
array.map(item => 2 * item)


def func_Returned(content:String)=(message:String)=>println(message)
func_Returned("Spark")


def func_Returned(content:String) =(message:String)=>println(content + " " + message)


val returned = func_Returned("Spark")


returned("Scala")


如果在函数的函数体中只使用了一次函数的输入参数的值此时我们可以将函数的输入参数的名称省略掉用下划线来代替。
def spark(func:(String)=>Unit,name:String){func(name)}
spark((name:String) => println(name),"Scala")
spark(name => println(name),"Scala")
spark(println,"Scala")
spark(println(_),"Scala")
array.map(2*_).filter(_ > 10).foreach(println)


闭包:函数的变量超出他的有效作用域的时候,还能够对函数的内部变量进行访问。
def scala(content:String)=(message:String) => println(content + ":" + message)
val funcResult = scala("Spark")
funcResult("Flink")


sum_Curring_Better(1)(3)
(1 to 100).reduceLeft(_+_)
val list = List("Scala","Spark","Fink")
list.map("The content is :" + _)
list.map(println)
val cal = list.map("The content is :" + _)
list.flatMap(_.split)
cal
cal.flatMap(_.split(" "))
cal.flatMap(_.split(" ")).foreach(print)
list.zip(List(10,6,5))


第四节:Scala模式匹配、类型系统
def bigData(data:String){
  data match{
    case "Spark" => println("WoW!!!")
    case "Hadoop" => println("Ok")
    
    case _ if data == "Flink" => println("Cool")
    case _ => println("Something others")
  }
}


bigData("Hadoop")
bigData("Flink")


import java.io._
def exception(e:Exception){
e match{
  case fileException: FileNotFoundException => println("File not found:" + fileException)
  case _:Exception => println("Exception getting thread dump from executor SexecutorId",e)
}
}


exception(new FileNotFoundException("oop"))


def data(array:Array[String]){
  array match{
  case Array("Scala") => println("Scala")
  case Array(spark,hadoop,flink)=>println(spark + ":" + hadoop + ":" + flink)
  case Array("Spark",_*) => println("Spark ...")
  case _ => println("Unknow")
  }
}


data(Array("Scala"))
data(Array("Scala","Spark","Kafka"))


case class Person(name:String)


class Compare[T : Ordering](val n1:T,val n2:T){
  def bigger(implicit ordered:Ordering[T]) = if(ordered.compare(n1,n2) > 0) n1 else n2
}
作业:阅读Spark源码RDD hadoopRDD SparkContext Master Worker的源码,并分析里面使用的所有模式匹配和类型参数的内容。


-嵌套类------------------------
class A{class B}
val a1 = new A
val a2 = new A
val b1 = new a1.B
val b2 = new a2.B


A$B
b1.getClass
b1.getClass == b2.getClass


typeOf[a1.B] == typeOf[a2.B]


class Person[T](val content : T)


val p = new Person[String]("Spark")
p.getContent("Scala")
p.getContent(100)
val p = new Person[String(2.3)]


ViewBounds 语法 <%


第五节:Scala隐式转换和并发编程


val result = 3 * Fraction(4,5)




import scala.math.abs


class Fraction(n: Int, d: Int) {
  private val num: Int = if (d == 0) 1 else n * sign(d) / gcd(n, d);
  private val den: Int = if (d == 0) 0 else d * sign(d) / gcd(n, d);


  override def toString = num + "/" + den


  def sign(a: Int) = if (a > 0) 1 else if (a < 0) -1 else 0


  def gcd(a: Int, b: Int): Int = if (b == 0) abs(a) else gcd(b, a % b)


  def +(other:Fraction):Fraction={
    newFrac((this.num * other.den) + (other.num * this.den),this.den * other.den)
  }


  def -(other:Fraction):Fraction={
    newFrac((this.num * other.den) - (other.num * this.den),this.den * other.den)
  }


  def *(other:Fraction):Fraction={
    newFrac(this.num * other.num,this.den * other.den)
  }


  def /(other:Fraction):Fraction={
    newFrac(this.num * other.den,this.den * other.num)
  }


  private def newFrac(a:Int,b:Int):Fraction={
    val x:Int = if (b == 0) 1 else a * sign(b) / gcd(a, b);
    val y:Int = if (b == 0) 0 else b * sign(b) / gcd(a, b);
    new Fraction(x,y)
  }
}


object Test extends App{
  val f = new Fraction(15,-6)
  val p = new Fraction(20,60)
  println(f)
  println(p)
  println(f + p)
  println(f - p)
  println(f * p)
  println(f / p)
}




class Level(val level:Int)
def toWorker(name:String)(implicit level : Level){
  println(name + ":" + level)
}
implicit val level = new Level(8)
toWorker("Spark")




implicit val level = new Level(8)
class Level(val level:Int)
def toWorker(name:String)(implicit l:Level) = println(name  + ":" + l.level)


import scala.actors.Actor
class HiActor extends Actor{
  def act(){
    while(true){
      receive{
        case name:String => println(name)
      }
    } 
  }
}
val actor = new HiActor
actor.start()
actor ! "Spark"


case class Basic(name: String,age: Int)
case class Worker(name: String,age: Int)


class basicActor extends Actor{
  def act(){
  while(true){
    receive{
      case Basic(name,age) => println("Basic Information:" + name + " : " + age)
      case Worker(name,age) => println("Basic Information:" + name + " : " + age)
    }
  }
  }
}


val b = new basicActor
b.start
b ! Basic("Scala",13)
b ! Worker("Spark",7)
val result = b !? Worker("Spark",7)


sc.textFile("hdfs://192.168.1.30:9000/spark/input/access_2013_05_30.log").flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).saveAsTextFile("hdfs://192.168.1.30:9000/spark/output4/")


---------------------------------------
第十二课Spark集群工作原理
Spark高可用HA实战
-----------------------------------------------------------------------------------------------------
               |---------------------------------------|
               |                zookeeper              |               
      |---------------------------------------|
^                            ^
|                            |
|                            |
 Driver     <---->Master(active)               Master(standby)
^                    ^
|     |
|                    |
|                    V
| worker           worker
|   ^                 ^
|   |                 |
|   |                 |
|   v                 V
v executor         executor
-------------------------------------------------------------------------------------------
2014年6月以前都是 两台active  standby集群资源分配
以后都是三台:一台active 两台standby 通过zookeeper选出leader
zookeeper:包含的元数据有Worker、Driver、Application。


切换Master:程序在运行之前已经向Master申请过资源 Driver和executor进行通信 这种情况下不需要Master参与。除非executor出现故障。
弊端:
粗粒度:
 优点:一次性资源的分配后,不用关心资源的分配。而让Drive和executor进行交互完成作业。
 弊端:Job 一百万个任务 有一个没有完成就等待在哪里。资源不会释放,闲置在那里。
细粒度:
 优点:你有这个计算资源就分配给计算任务。
 弊端:任务启动慢、没有办法复用。
一般都是使用粗粒度。


spark-shell --master spark://master:7077,slave1:7077,slave2:7077
-------------------------------------------------------------------------------------------------------------------------------------
第十三课:Spark内核架构
Spark Runtime




Driver  <------------->  Worker(RAM、Input Data)
        results tasks
        <------------->
--------------------------------------------------------------------------------------------------------------------------
Driver部分的代码:SparkConf + SparkContext
Drive运行Application的main函数,创建的SparkContext是整个程序运行调度的核心,SparkContext要有高层调度器DagScheduler、底层调度器TestScheduler,也有SchedulerBackend 
向Master注册程序,注册成功后,Master会分配资源,根据action触发的job job里有一系列的RDD
从后向前推发现如果是宽依赖:发放给不同的stage,stage发放给底层调度器TestScheduler
一般表示standalone模式


应用程序有两个层面:
Application = driver + executor
Driver部分的代码:SparkConf + SparkContext
Worker管理当前节点的计算资源并接受Master指令来分配具体的计算资源Executor(在新的进程中分配)
ExecutorRunner


spark优势:
1、基于内存计算
2、调度和容错


窄依赖:一对一的,固定个数的依赖


stage:计算逻辑完全一样只是计算的数据不同。
问题:一个partition是否精准的等于一个block大小?不是


一个Application里可以有多个job。
checkpoint也可以导致Job


专门用来提交Spark程序,这台机器一般一定和Spark Cluster在同样的网络环境中(Driver频繁和Executor通信),且其配置和普通的worker一致
Application(各种依赖的外部资源,例如:*.soFile),使用sparkSubmit去运行程序(可以配置运行时候的各种参数,例如memory cores。。。)实际生产环境下写Shell脚本自动化配置
和提交程序,当然当前的机器一定要安装了Spark,只不过是这里安装的Spark不属于集群。


Driver(核心是SparkContext)
--supervise  当Driver挂掉后,集群可重新启动Driver。
SparkContent:创建DAGScheduler、TaskScheduler、SchedulerBackend
在实例化过程中Register注册当前程序的Master,Master接受注册,如果没有问题,Master合为当前程序分配Appid并分配计算资源。
一般情况下通过action触发job时SparkConext会通过DAGScheduler来把Job中的RDD构成的DAG划分成不同的Stage,每个Stage内部是一系列业务逻辑完全相同但是处理数据不同的Task,构成了
TaskSet
TaskScheduler和SchedulerBackend负责具体Task的运行(遵循数据本地性)


--------------------------------------------------------------------------------------------------------------------
Spark的程序的运行有两种模式:Client Cluster
Spark Cluster
Master:接受用户提交的作业并发送指令给Worker为当前程序分配计算资源。每个Worker所在节点默认为当前程序分配一个Executor,在Executor中通过线程池并发执行
Master通知Worker接受要求启动Executor
Worker Node 
Worker进程,通过一个Prox为ExecutorRunner的对象实例来远程启动ExecutorBackend进行
ExecutorBackend进程里面有Executor
实际在工作的时候通过TaskRunner来封装Task,然后从ThreadPool中获取一条线程执行Task,执行完后线程被回收复用。


ThreadPool
最后一个Stage中Task称为ResultTask,产生Job的结果,其他前面的Stage中的Task都是ShuffleMapTask,为下一阶段的Stage做数据准备,相当于MapReduce中的Mapper


整个Spark程序的运行,就是DAGScheduler把Job划分成不同的Stage,提交TaskSet的TaskScheduler,进而提交给Executor执行(符合数据本地性),每个Task会计算RDD中的一个
Partition,基于该Partition来具体执行我们定义的一系列同一个Stage内部函数,依次类推直到整个程序运行完成。
1、spark-env.sh spark-default.sh
2、spark-submit提供的参数
3、程序中SparkConf配置的参数
----------------------------------------------------------------------------------------------------------------
第十四课 RDD解密
1、RDD:基于工作集的应用抽象
2、RDD内幕
3、RDD思考
基于数据流不适合的场景
1、不适合大量的迭代
2、交互式查询
重点是:基于数据流的方式,不能够复用曾经的结果或者中间计算结果
RDD是基于工作集的
RDD:Resillient Distributed Dataset
弹性之一:自动的进行内存和磁盘数据存储的切换
弹性之二:基于Lineage的高效容错性
弹性之三:Task如果失败会自动进行特定次数的重试
弹性之四:Stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
弹性之五:checkpoint和persist
弹性之六:数据调度弹性:DAG、TASK和资源、管理无关
弹性之七:数据分片的高度弹性


RDD:是分布式函数式编程的抽象
RDD通过记录数据更新的方式为何很高效。
1、RDD不可变的 + lazy
2、RDD是粗粒度的


RDD写是粗粒度的
但是RDD的操作可以使粗粒度也可以是细粒度的。
Spark要统一数据计算领域,除了实时事务性处理


RDD不支持细粒度的写操作以及增量迭代计算
--------------------------------------------------
第十五课 RDD创建内幕
第一个RDD:代表了Spark应用程序输入数据的来源
通过Tranformation来对RDD进行各种算子的转换
1、使用程序中的集合创建RDD
2、使用本地文件系统创建RDD
3、使用HDFS创建RDD
4、基于DB创建RDD
5、基于NoSql,例如Hbase
6、基于S3创建RDD
7、基于数据流创建RDD


1、通过集合创建RDD的实际意思:测试
2、使用本地文件系统创建RDD的作用,
3、使用HDFS来创建RDD 生产环境最常用的RDD创建方式


实例:基于集合来创建RDD
object RDDBasedOnCollections{
  def main(args: Array[String]){
    val conf = new SparkConf() //创建SparkConf对象
        conf.setAppName("RDDBasedOnCollections")
        conf.setMaster("local")
    val sc = new SparkContext(conf)
    //val number = 1 to 100
    //val rdd = sc.parallelize(number)
    //val sum = rdd.reduce(_ + _)
    //println("1+2+3...+99" + sum)


    val rdd = sc.textFile("D://data//SogouQ//")
    val linelength = rdd.map(line =>line.length)
    val sum = linelength.reduce(_ + _)
    println("the total=" + sum)
  }
}
Local模式 默认情况下如果失败了就是失败了
实际上Spark的并行度到底应该设置多少呢?
每个Core可以承载2-4个partition 64-128之间
----------------------------------------
第十六课 RDD实战
1、RDD实战
2、RDD的Transformation与Action
3、RDD执行手动绘图


action触发job shuffle触发stage


Transformations and Actions
|---------------------------------------------------------------------------------------------
|Transformations  map(f:T=>U)            : RDD[T] => RDD[U]
|                 filter(f:T=>Bool)      : RDD[T] => RDD[T]
|                 flatMap(f:T => Seq[U]) : RDD[T] => RDD[U]
|                 sample(raction: Float) : RDD[T] = > RDD[T](Deterministic sampling)
| groupByKey()           : RDD(K,V) => RDD[(K,Seq[V])]
| reduceByKey(f:(V,V)=>V) : RDD(K,V)=>RDD[(K,V)]
| union()                : (RDD[T],RDD[T])=>RDD[T]
| join()                 :(RDD|(K,V),RDD(K,W)) => RDD[(K,(V,W))]
| cogroup()              : (RDD[K,V],RDD[K,W]) => RDD[K,(Seq[V],Seq[W])]
| crossProduct()         : (RDD[T],RDD[U]) => RDD[(T,U)]
| mapValues(f:V => W)    : RDD[(K,V)](Preserves partitioning)
| sort(c:Comparator[K])  : RDD[(K,V)]=>RDD[(K,V)]
| partitionBy(p:Partitioner[K]): RDD[(K,V)]=>RDD[(K,V)]
|Actions          count()                : RDD[T] => Long
|                 collect()              : RDD[T] =>Seq[T]
| reduce(f:(T:T)=>T)     : RDD[T] => T
| lookup(k:K)            : RDD[(K,V)]=>Seq[V](On hash/range partitioned RDDs)
| save(path : String)    : Output RDD to a storage system.e.g.HDFS
|-----------------------------------------------------------------------------------------------


-----------------------------------------------------
第十七课
1、map、filter、flatMap操作回顾
2、reduceByKey、groupByKey
3、join、cogroup


object Tranformations{
  def main(args: Array[String]){
    val sc = new SparkContext("Tranformation Operation")
    mapTranformation(sc)
    filterTranformation(sc)
    flatMapTranformation(sc)


    groupByKeyTranformations(sc)
    reduceByKeyTranformation(sc)


    sc.stop()
  }


  def sparkContext(name:String)={
    val conf = new SparkConf().setAppName("Tranformation").setMaster("local")
    val sc = new SparkContext(conf)


  }


  def mapTranformation(sc:SparkContext){
    val nums = sc.parallelize(1 to 10)
    val mapped = nums.map(item => 2*item)
    mapped.collect.foreach(println) //收集计算结果并循环打印
  }


  def filterTransformation(sc:SparkContext){
    val nums = sc.parallelize(1 to 20)
    val filtered = nums.filter(item => item%2 ==0)
    filtered.collect.foreach(println)
  }


  def flatMapTranformation(sc: SparkContext){
    val bigData = Array("Scala Spark","Java Hadoop","Java Tachyon")//实例化字符串类型的Array
    val bigDataString = sc.parallelized(bigData) //创建以字符串为元素类型的ParallelCollectionRDD
    val words = bigDataString.flatMap(line => line.split(" "))
    words.collect.foreach(println)
  }


  def groupByKeyTranformation(sc:SparkContext){
    val lines = sc.textFile("")
    val words = lines.flatMap{line => line.split(" ")}
    val pairs = words.map{word =>(word,1)}
    val wordCountsOdered = pairs.reduceByKey(_+_)
    wordCountsOdered.collect.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))
  }


  def joinTranformation(sc: SparkContext){
    val studentNames = Array(
      Tuple2(1,"Spark"),
      Tuple2(2,"Spark"),
      Tuple2(3,"Spark"),
    )
    val studentScores = Array(
      Tuple2(1,100),
      Tuple2(2,95),
      Tuple2(3,65),
    )
    val name = sc.parallelize(studentNames)
    val scores = sc.parallelize(studentScores)


    val studentNameAndScore = name.join(scores)
    studentNameAndScore.collect.foreach(println)
  }
}


def join[W](other:RDD)[(K,W)],partitioner:Partitioner):RDD[(K,(V,W))] = self.withScope{
  this.cogroup(other,partitioner).flatMapValues(pair => for(v <- pair._1.iterator;
  w <- pair._2.iterator) yield(v,w)
  )
}


-java-实现
JavaSparkContext sc = new JavaSparkContext(conf)
List<Tuple2<Integer,String>> namesList = Arrays.asList(
  new Tuple2<Integer,String>(1,"Spark"),
  new Tuple2<Integer,String>(2,"Tachyon"),
  new Tuple2<Integer,String>(3,"Hadoop")
);


List<Tuple2<Integer,String>> scoresList = Arrays.asList(
  new Tuple2<Integer,Integer>(1,100),
  new Tuple2<Integer,Integer>(2,90),
  new Tuple2<Integer,Integer>(3,70),
  new Tuple2<Integer,Integer>(1,110),
  new Tuple2<Integer,Integer>(2,95),
  new Tuple2<Integer,Integer>(2,60),
);
JavaRDD<Tuple2<Integer,String>> names = sc.parallelizePairs(namesList);
JavaRDD<Tuple2<Integer,Integer>> scores = sc.parallelizePairs(scoresList);
names.cogroup(scores);


JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> nameScores = names.cogroup(scores);
nameScores.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>(){
  private static final long seriaVersionUID = 1L;
  public void call(Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> t) throws Exception{
    System.out.println("Student ID:" + t._1)
    System.out.println("Name:" + t._2._1)
    System.out.println("Score:" + t._2._1)
    System.out.println("==============================")
  }
})




join和cogroup是所有Spark学习者必须掌握的内容,没有任何商量的余地
---------------------------
第十八课RDD持久化、广播、累加器
val numbers = src.parllelize(1 to 100)
numbers.reduce(_+_)
val result = numbers.map(2*_)
val data = result.collect
如果想在命令终端中看到结果,就必须collect
persist 
1、某步骤计算特别好使
2、计算链条特别长的情况
3、checkpoint要在RDD也一定要持久化数据
4、shuffle之后
5、shuffle之前(框架默认帮助我们把数据持久化到本地磁盘)


cache之后不能有其他算子
persist是lazy级别的
unpersist是eager级别的


广播是由Driver发给前Application分配的所有Executor内存级别的只读变量。
executor中的线程池中线程共享该全局变量,极大的减少了网络传输(否则的话每个Task都要传输一次该变量)并极大的节省了内存,
当然也隐形的提高的CPU的有效工作。




累加器:Accumulator:对于Executor只能修改但不可读,只对Driver可读。
val sum = sc.accumulator(0)
val data = sc.parallelize(1 to 100)
data.foreach(item => sum += item)
val result = data.foreach(item => sum += item)
println(sum)
-------------------------------------------------------------
第十九课 Spark高级排序
public class SecondarySortApp{
  public static void main(String[] args){
    SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> lines = sc.textFile("")
    JavaPairRDD<SecondarySortKey,String> pairs = lines.mapToPair(new PairFunction<String,SecondarySortKey,String>){
      private static final long serialVersionUID = 1L;
      public Tuple2<SecondarySortKey,String> call(String line) throws Exception{
        String[] splited = line.split(" ")
SecondarySortKey key = new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));
return new Tuple2<SecondarySortKey,String>(key,line);
      }
    }


    JavaPairRDD<SecondarySortKey,String> sorted = pairs.sortByKey(); //完成二次排序
    //过滤掉排序后自定的Key,保留排序的结果
    JavaRDD<String>
    sorted.map(new Function<Tuple2<SecondarySortKey,String>,String>(){
      public String call(Tuple2<SecondSortKey,String> sortedContent) throws Exception{
        return sortedContent._2;
      }
    })
  }


}


--scala实现--
class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{
  def compare(other:SecondarySortKey):Int={
    if(this.first - other.first != 0){
      this.first -other.first
    }else{
      this.second - other.second
    }
  }
}


object SecondarySortApp{
  def main(args:Array[String]){
    val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("")
    val pairWithSortKey = lines.map(line =>(
      new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
    ))


    val sorted = pairWithSortKey.sortByKey(false)
    val sortedResult = sorted.map(sortedLine => sortedLine._2)
    sortedResult.collect().foreach(println)
  }
}
-----------------------------------------
第二十一课 从Spark架构中透视Job


worker:负责当前节点cpu和内存资源的使用
spark-shell默认情况下没有任何的Job
默认的资源分配方式在每个worker上为当前程序分配一个ExecutorBackend进行,且默认情况下会最大化的使用Core和Memory
Executor会并发线程池来运行Task


CoarseGrainedExecutorBackend:里面有executor,executor会通过并发线程池线和复用的方式来执行我们的Task
在一个Executor中一次性最多能够运行多少并发的task取决于当前Executor能够使用的Cores的数量
由于线程不关心具体Task中运行什么代码,所以Task和Thread是解耦合的,所以Thread是可以被复用的。


当Spark集群启动的时候,首先启动Master进程负责整个集群资源的管理和分配并接受作业的提交且为作业分配计算计算资源,每个工作节点默认情况下都会启动一个
Worker Process来管理当前节点的Memory,CPU等计算资源并且向Master汇报Worker还能够正常工作。
Worker还能够正常工作,当用户提交作业给Master的时候,Master会为程序分配ID并且分配计算资源,默认情况下会为当前的应用程序在每个WorkerProcess下面分配一个
CoarseGranedExceptionBackend进程,该进程默认情况下会最大化的使用当前节点上的内存和CPU


我们说Worker Process管理当前节点的内存和CPU的计算资源,实质上是通过Master管理每台机器上的计算资源的。
WorkerProcess会接受Master的指令为当前要运行的应用程序来分配CoarseGranedExceptionBackend进程


Stage0是Stage1的Mapper
Stage1是Stage2的Mapper
Stage1是Stage0的reduce
Stage2是Stage1的reduce
Spark是一个更加精致和高效的MapReduce思想的具体实现
最后一个Stage里面的Task是Result Task类型
前面所有的Stage中Task的类型都是ShuffleMap Task类型


Stage里面的内容一定是在Executor中执行的!
而且Stage必须从前往后执行


Spark的一个应用程序中可以因为不同的Action产生众多的job,每个Job至少有一个Stage
--------------------------------------------------------------------------------
第二十二课 RDD的依赖关系
1、窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如:map、filter等都会产生窄依赖
2、宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖


总结:如果父RDD的一个Partition被一个RDD的Partition所使用就是窄依赖,否则的话是宽依赖。如果子RDD中的Partition对父RDD的Partition依赖的数量不会随着RDD数量规模
的改变而改变的话,就是窄依赖,否则的话就是宽依赖。


特别说明:对join操作有两种情况,如果说join操作的使用每个partition仅仅和已知的Partition进行join,这次是join操作就是窄依赖,其他情况的join操作就是宽依赖
因为是确定的partition数据的依赖关系,所有就是窄依赖,得出一个推论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖(也就是说对父RDD的依赖的partition
的数量不会随着RDD数量规模的改变而改变)


遇到Shuffle级别的依赖关系必须计算依赖的RDD的所有Partition 并且都发生在一个Task中计算
上面两种假设的核心问题都是在遇到shuffle依赖的时候无法进行pipeline




注意:
1、从后往前推理遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到该Stage中;
2、每个Stage里面的Task数量是由该Stage中最后一个RDD的Partition的数量所决定。
3、最后一个Stage里面的任务的类型是ResulTask,前面其他所有的Stage里面的任务的类型就是ShuffleMapTask
补充:Hadoop中的MapReduce操作中的Mapper和Reducer在Spark中基本等量算子是:map、reduceByKey;


表面上是数据流动,实质上算子在流动
1、数据不动代码动
2、在一个Stage内幕算子为何会流动(Pipline)?首先是算子合并,也就是所谓的函数式编程的执行的时候最终进行函数的展开从而把一个Stage内部的多个算子合并成为一个大算子
(其内部包含了当前Stage中所有算子对数据的计算逻辑);其次是由于Tranformation操作的Lazy特性!在具体算子交给集群的Executor计算之前首先会通过Spark Framework(DAGScheduler)
进行算子的优化(基于数据本地性的pipeline)


-----------------------------------------------------------------------
第二十三课 从物理执行角度透视Spark
一、再次思考pipeline
即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式:
1、f(record),f作用于集合的每一条记录,每次只作用于一条记录;
2、f(record),f一次性作用于集合的全部数据
Spark采用是第一种方式,原因:
1、无需等待,可以最大化的使用集群的计算资源
2、减少OOM的发生;
3、最大化的有利于并发;
4、可以精准的控制每一Partition本身(Dependency)及其内部的计算(compute);
5、基于lineage的算子流动式函数式编程,节省了中间结果的产生,并且可以最快的恢复;
疑问:会不会增加网络通信?当然不会!因为在pipeline


二、思考Spark Job 具体的物理执行
  Spark Application里面可以产生1个或者多个Job,例如spark-shell默认启动的时候内部就没有Job,只是作为资源的分配程序,可以在spark-shell里面写代码产生若干个Job,
普通程序中一般而言可以有不同的Action,每个Action一般也会触发一个Job
  Spark是MapReduce思想的一种更加精致和高效的实现,MapReduce有很多具体不同的实现,例如Hadoop的MapReduce基本的计算流程如下:首先是以JVM为对象的并发的Mapper,
Mapper中map的执行会产生输出数据,输出数据会经过Pariitioner指定的规则放到LocalFileSystem中,然后在经由Shuffle、Sort、Aggregate变成Reducer中的reduce的输入,
执行reduce产生最终的执行结果。Hadoop MapReduce执行的流程虽然简单,但是过于死板,尤其是在构造复杂算法(迭代)时候非常不利于算法的实现,且执行效率极为低下!


  Spark算法构造和物理执行时最基本的核心:最大化pipeline
基于Pipeline的思想,数据被使用的时候才开始计算,从数据流动的视角来说,是数据流到计算的位置!实质上从逻辑的角度来看,是算子在数据上流动!
从算法构建的角度而言:肯定是算子作用于数据,所以是算子在数据上流动;方便算法的构建!
  
  从物理执行的角度而言:是数据流动到计算的位置;方便系统最为高效的运行!
对于pipeline而言,数据计算的位置就是每个Stage中最后的RDD,一个震撼人心的内幕真相就是:每个Stage中除了最后一个RDD算子是真实的以外,前面的算子都是假的!
由于计算的Lazy特性,导致计算从后往前回溯,形成Computing Chain,导致的结果就是需要首先计算出具体一个Stage内部左侧的RDD中本次计算依赖的Partition


三、窄依赖的物理执行内幕
  一个Stage内部的RDD都是窄依赖,窄依赖计算本身是逻辑上看是从Stage内部最左侧的RDD开始立即计算的,根据Computing Chain,数据从一个计算步骤流动到下一个结算步骤,
以此类推,直到计算到Stage内部的最后一个RDD来产生计算结果。
  Computing Chain的构建是从后往前回溯构建而成,而实际的物理计算则是让数据从前往后在算子上流动,直到流动到不能再流动位置才开始计算下一个Record。


四、宽依赖物理执行内幕
  必须等到依赖的父Stage中的最后一个RDD全部数据彻底计算完毕,才能能够经过shuffle来计算当前Stage。
-----------------
第二十四课 Spark Hash Shuffle内幕彻底解密
一:到底什么是Shuffle?
  Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。




































Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐