Kafka+SparkStreaming已经发展为一个比较成熟的实时日志收集与计算架构,利用Kafka,即可以支持将用于离线分析的数据流到HDFS,又可以同时支撑多个消费者实时消费数据,包括SparkStreaming。然而,在SparkStreaming程序中如果有复杂业务逻辑的统计,使用scala代码实现起来比较困难,也不易于别人理解。但如果在SparkSteaming中也使用SQL来做统计分析,是不是就简单的多呢?

本文介绍将SparkSQL与SparkStreaming结合起来,使用SQL完成实时的日志数据统计。SparkStreaming程序以yarn-cluster模式运行在YARN上,不单独部署Spark集群。

环境部署
Hadoop-2.6.0-cdh5.13.0(YARN)
spark-2.1.0-bin-hadoop2.6
kafka-0.9.0+kafka2.0.1(上次使用的是kafka-0.10.0版本)

实时统计需求
以10秒为间隔,统计10秒内的各大区潜客的数量
pom

 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${spark.artifact}</artifactId>
            <version>${spark.version}</version>
            <scope>${dependency.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${dependency.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>11.2.0.3</version>
            <scope>${dependency.scope}</scope>
        </dependency>

SparkStreaming程序代码

package com.chumi.dac.sp.stream.sparksqlcount
import kafka.serializer.StringDecoder
import com.chumi.dac.sp.stream.jdbc.DBCustomerStreamRssc
import com.chumi.dac.sp.stream.utils.DateUtil
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

/**
  * Created by LHX on 2018/8/24 14:37.
  * 从kafka读取数据 结合oracle数据 join
  */

object CustomerStreamRsscCount {
    /**
      * BroadcastWrapper,用来注册广播变量
      */
    object BroadcastWrapper {
        @volatile private var instance:Broadcast[String]=null
        def getInstance(sc: SparkContext): Broadcast[String] = {
            val point_time: String = DateUtil.getPointTime()
            if (instance == null) {
                synchronized {
                    if (instance == null) {
                        instance = sc.broadcast(point_time)
                        //初始化数据库
                        DBCustomerStreamRssc.initializeHour(point_time)
                        println("==初始化全局变量=="+point_time)
                    }
                }
            }
            instance
        }
        def update(sc: SparkContext, blocking: Boolean = false,hv:String): Broadcast[String] = {
            if (instance != null)
                instance.unpersist(blocking)
            instance = sc.broadcast(hv)
            println("==更新=="+hv)
            instance
        }
    }
    /**
      * SQLContextSingleton
      */
    object SQLContextSingleton {
        @transient  private var instance: SQLContext = _
        def getInstance(sparkContext: SparkContext): SQLContext = {
            if (instance == null) {
                instance = new SQLContext(sparkContext)
            }
            instance
        }
    }
    case class DapLog(CIM_ID:String, ENTITY_CODE:String, CARD_FOUND_TIME:String)

    def main(args: Array[String]) {

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setAppName("CustomerStreamRsscCount").setMaster("local[2]")
            val ssc = new StreamingContext(conf, Seconds(10))
            val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)

            //要使用updateStateByKey方法,必须设置Checkpoint
            ssc.checkpoint("C:/tmp/checkPointPath")

            //TM_SST
            val jdbcMaps = Map("url" -> "jdbc:oracle:thin:@//IP:1521/DB",
                "user" -> "user",
                "password" -> "password",
                "dbtable" -> "dbtable",
                "driver" -> "oracle.jdbc.driver.OracleDriver")
            val jdbcDFs = sqlContext.read.options(jdbcMaps).format("jdbc").load
            jdbcDFs.createOrReplaceTempView("TM_SST")

            //TM_RSSC
            val jdbcMapc = Map("url" -> "jdbc:oracle:thin:@//IP:1521/DB",
                "user" -> "user",
                "password" -> "password",
                "dbtable" -> "dbtable",
                "driver" -> "oracle.jdbc.driver.OracleDriver")
            val jdbcDFv = sqlContext.read.options(jdbcMapc).format("jdbc").load
            jdbcDFv.createOrReplaceTempView("TM_RSSC")

            val topics = "test01" 
            val topicsSet = topics.split(",").toSet
            val brokers = "svldl072.test.com:9092,svldl073.test.com:9092,svldl077.test.com:9092"

            val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers)

            //注册广播变量
            var broadcast: Broadcast[String] = BroadcastWrapper.getInstance(ssc.sparkContext)
            //按小时累加
            var hourValue = ""
            var zeroTime = ""

             val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
            val value = dStream.transform(rdd => {
                //获取SQLContext
                val sqlC = SQLContextSingleton.getInstance(rdd.sparkContext)
                import sqlContext.implicits._
                val logDataFrame = rdd.map(w => {
                    val m: Array[String] = w._2.split(",")
                    DapLog(m(0), m(1), m(9))
                }).toDF()
                // 注册为tempTable
                logDataFrame.createOrReplaceTempView("TT_VW_POTENTIAL_CUSTOMER")
                val sql = "select R.RSSC_ID,R.RSSC_NAME,COUNT(1) FROM TT_VW_POTENTIAL_CUSTOMER  T join  TM_SST S on T.ENTITY_CODE = S.ENTITYCODE join TM_RSSC R ON S.RSSC_ID = R.RSSC_ID  GROUP BY R.RSSC_ID,R.RSSC_NAME"
                val data1: DataFrame = sqlC.sql(sql)
                val a =data1.rdd.map{r =>(r(1).toString,r(2).toString.toInt) }
                a
            })
            //将以前的数据和最新10s的数据进行求和
            val addFunction = (currValues : Seq[Int],preValueState : Option[Int]) => {
                val currentSum = currValues.sum
                val previousSum = preValueState.getOrElse(0)
                //如果全局变量更新清零
                if(broadcast.value != hourValue){
                    Some(currentSum)
                }else{
                    Some(currentSum + previousSum)
                }
            }
            val total: DStream[(String, Int)] = value.updateStateByKey[Int](addFunction)

            //输出总计的结果
            total.foreachRDD(rdd=>{
                hourValue = DateUtil.getPointTime()
                zeroTime = DateUtil.getZeroTime()

                // 收集数据,遍历
                val tuples: Array[(String, Int)] = rdd.collect()
                //如果当前整点时间 !=全局整点时间
                if(broadcast.value!=null && broadcast.value != hourValue){
                    //插入数据库
                    DBCustomerStreamRssc.initializeHour(hourValue)
                    for (i <- 0 until tuples.length){
                        DBCustomerStreamRssc.updateTable(tuples(i)._1,tuples(i)._2,hourValue)
                    }
                    //更新全局整点时间
                    broadcast = BroadcastWrapper.update(rdd.sparkContext, true,hourValue)
                    println("==更新后=="+broadcast.value)
                }else{
                    //更新数据库
                    for (i <- 0 until tuples.length){
                        DBCustomerStreamRssc.updateTable(tuples(i)._1,tuples(i)._2,broadcast.value)
                    }
                }

            })
            ssc
    }
    //重启streamingContext,读取以前保存的数据,否则创建新的StreamingContext
    val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)
    context.start()
    context.awaitTermination()

    }
}

遇到的问题:
报错

Error:(119, 71) type arguments [String,String,org.apache.commons.codec.StringDecoder,org.apache.commons.codec.StringDecoder] conform to the bounds of none of the overloaded alternatives of
 value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String], topics: java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V] <and> [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V]](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], topics: Set[String])(implicit evidence$19: scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V], implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22: scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K, V)]
             val dStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

解决方法:把kafka_2.11版本从0.9.0.0改成0.8.2.1即可。

总结
其中广播变量是根据时间筛选数据时候使用的,整体思路是先读取oracle数据并注册成临时表,后获取kafka数据,根据dStream.transform()方法把数据转换成想要的结果,最后用updateStateByKey()方法累加上一批次的统计结果。 对于初学者很多sparkstream方法还不是很熟悉,所以写代码想不到使用,如果对大家有所帮助,记得点赞哦~

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐