2-1、Spark的单词统计WC
1、输入数据:[root@spark0 bigdata]# pwd/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata[root@spark0 bigdata]# more wcDemo1.txthadoophivesolrrediskafkahadoopstormflumesqoopdockerspark
·
1、输入数据:
[root@spark0 bigdata]# pwd
/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata
[root@spark0 bigdata]# more wcDemo1.txt
hadoop hive
solr redis
kafka hadoop
storm flume
sqoop docker
spark spark
hadoop spark
elasticsearch hbase
hadoop hive
spark hive
hadoop spark
[root@spark0 bigdata]#
2、单词计数:
scala> val rdd=sc.textFile("/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo1.txt").flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).collect
rdd: Array[(String, Int)] = Array((spark,5), (hive,3), (hadoop,5), (docker,1), (flume,1), (solr,1), (storm,1), (elasticsearch,1), (kafka,1), (sqoop,1), (redis,1), (hbase,1))
3、对结果升序排序:
scala> val rdd=sc.textFile("/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo1.txt").flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).sortByKey().collect
rdd: Array[(String, Int)] = Array((docker,1), (elasticsearch,1), (flume,1), (hadoop,5), (hbase,1), (hive,3), (kafka,1), (redis,1), (solr,1), (spark,5), (sqoop,1), (storm,1))
4、对结果降序排序:
scala> val rdd=sc.textFile("/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo1.txt").flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).sortByKey(false).collect
rdd: Array[(String, Int)] = Array((storm,1), (sqoop,1), (spark,5), (solr,1), (redis,1), (kafka,1), (hive,3), (hbase,1), (hadoop,5), (flume,1), (elasticsearch,1), (docker,1))
5、统计结果行数
scala> val rdd=sc.textFile("/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo1.txt").flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).sortByKey(false).count
rdd: Long = 12
6、将结果进行保存:
scala> val rdd=sc.textFile("/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo1.txt").flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_).sortByKey(false).saveAsTextFile("/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo_out")
rdd: Unit = ()
scala>
7、解析:
对于WC而言,需要从输入数据中每行字符串解析出单词,然后将相同的单词放到一个桶中,最后统计每个桶中每个单词出现的频率。
flatMap函数将一条记录转换成多条记录(一对多关系),map函数将一条记录转换成另一条记录(一对一关系),reduceByKey函数将Key相同的数据划分到一个桶中,并以Key为单位分组进行计算。
经过一系列的RDD转换算子操作,之前都是Transformation算子,最后collect、saveAsTextFile、count都是Actions算子。
8、查看结果
[root@spark0 wcDemo_out]# pwd
/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo_out
[root@spark0 bigdata]# cd wcDemo_out/
[root@spark0 wcDemo_out]# ll
总用量 4
-rw-r--r--. 1 root root 128 12月 13 22:35 part-00000
-rw-r--r--. 1 root root 0 12月 13 22:35 _SUCCESS
[root@spark0 wcDemo_out]# pwd
/usr/local/spark-1.5.2-bin-hadoop2.6/bigdata/wcDemo_out
[root@spark0 wcDemo_out]# ll
总用量 4
-rw-r--r--. 1 root root 128 12月 13 22:35 part-00000
-rw-r--r--. 1 root root 0 12月 13 22:35 _SUCCESS
[root@spark0 wcDemo_out]# more part-00000
(storm,1)
(sqoop,1)
(spark,5)
(solr,1)
(redis,1)
(kafka,1)
(hive,3)
(hbase,1)
(hadoop,5)
(flume,1)
(elasticsearch,1)
(docker,1)
[root@spark0 wcDemo_out]#
更多推荐
已为社区贡献4条内容
所有评论(0)