Structured Streaming与Kafka的整合,实现不同json结构解耦

问题:Structured Streaming从kafka的不同topic读取数据,每个topic的value存取的数据格式是不同的。那么怎么使用一套模版代码,分别对多个topic进行读取数据。做到解耦呢?

思考:Structured Streaming读取kafka的操作是一致的,只是对kafka的value值的解析操作和一些参数配置,处理数据的sql是不一样的。可以把解析操作抽象出来处理。通过定义Bean对象,将json解析成对应的Bean,最后通过传入配置文件的方式,将对应的配置信息及sql传入,然后对数据来进行处理,得到需要的数据。


一、具体代码如下:
  • CommonStructuedKafka类
package com.test

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object CommonStructuedKafka {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.kafka").setLevel(Level.WARN)

    // 读取配置文件信息
    val masterUrl = Props.get("master", "local")
    val appName = Props.get("appName", "Test7")
    val className = Props.get("className", "")
    val kafkaBootstrapServers = Props.get("kafka.bootstrap.servers", "localhost:9092")
    val subscribe = Props.get("subscribe", "test")
    val tmpTable = Props.get("tmpTable", "tmp")
    val sparksql = Props.get("sparksql", "select * from tmp")


    val spark = SparkSession.builder()
      .master(masterUrl)
      .appName(appName)
      .getOrCreate()


    // 读取kafka数据
    val lines = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("subscribe", subscribe)
      .load()

    //隐式转换
    import spark.implicits._

    val values = lines.selectExpr("cast(value as string)").as[String]

    val res = values.map { value =>
      // 将json数据解析成list集合
      val list = Tools.parseJson(value, className)
      // 将List转成元组
      Tools.list2Tuple7(list)
    }

    res.createOrReplaceTempView(tmpTable)

    val result = spark.sql(sparksql)

    val query = result.writeStream
      .format("console")
      .outputMode("append")
      .start()

    query.awaitTermination()
  }
}
  • Tools:解析json的工具类
package com.test

import com.google.gson.Gson

import scala.collection.mutable


object Tools {

  def main(args: Array[String]): Unit = {
    val tools = new Tools()
    val res = tools.parse("{'name':'caocao','age':'32','sex':'male'}", "com.test.People")
    println(res)
  }

  def parseJson(json: String, className: String): List[String] = {
    val tools = new Tools()
    tools.parse(json, className)
  }

  // 将List转成Tuple7元组类,这里仅仅是定义7个字段,可以定义更多字段。(ps:这种处理方式很不雅,一时也没想到好办法)
  def list2Tuple7(list: List[String]): (String, String, String, String, String, String, String) = {
    val t = list match {
      case List(a) => (a, "", "", "", "", "", "")
      case List(a, b) => (a, b, "", "", "", "", "")
      case List(a, b, c) => (a, b, c, "", "", "", "")
      case List(a, b, c, d) => (a, b, c, d, "", "", "")
      case List(a, b, c, d, e) => (a, b, c, d, e, "", "")
      case List(a, b, c, d, e, f) => (a, b, c, d, e, f, "")
      case List(a, b, c, d, e, f, g) => (a, b, c, d, e, f, g)
      case _ => ("", "", "", "", "", "", "")
    }
    t
  }
}


class Tools {
  // 通过传进来的Bean的全类名,进行反射,解析json,返回一个List()
  def parse(json: String, className: String): List[String] = {
    val list = mutable.ListBuffer[String]()
    val gson = new Gson()
    val clazz = Class.forName(className)
    val obj = gson.fromJson(json, clazz)
    val aClass = obj.getClass
    val fields = aClass.getDeclaredFields
    fields.foreach { f =>
      val fName = f.getName
      val m = aClass.getDeclaredMethod(fName)
      val value = m.invoke(obj).toString
      list.append(value)
    }
    list.toList
  }
}

  • Props:读取配置文件的工具类
package com.test

import java.io.{FileInputStream, InputStream}
import java.nio.file.{Files, Paths}
import java.util.Properties

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

object Props {
  private val prop = new Properties()

  prop.load(getPropertyFileInputStream)

  /**
    * 在spark-submit中加入--driver-java-options -DPropPath=/home/spark/prop.properties的参数后,
    * 使用System.getProperty("PropPath")就能获取路径:/home/spark/prop.properties如果spark-submit中指定了
    * prop.properties文件的路径,那么使用prop.properties中的属性,否则使用该类中定义的属性
    */
  private def getPropertyFileInputStream: InputStream = {
    var is: InputStream = null
    val filePath = System.getProperty("PropPath")
    if (filePath != null && filePath.length > 0) {
      if (Files.exists(Paths.get(filePath))) {
        is = new FileInputStream(filePath)
      } else {
        println(s"在本地未找到config文件$filePath,尝试在HDFS上获取文件")
        val fs = FileSystem.get(new Configuration())
        if (fs.exists(new Path(filePath))) {
          val fis = fs.open(new Path(filePath))
          is = fis.getWrappedStream
        } else {
          println(s"在HDFS上找不到config文件$filePath,加载失败...")
        }
      }
    } else {
      println(s"未设置配置文件PropPath")
    }
    is
  }


  def get(propertyName: String, defaultValue: String): String = {
    prop.getProperty(propertyName, defaultValue)
  }


  def get(): Properties = {
    println("prop:" + this.prop)
    this.prop
  }


  def reload(): Properties = {
    prop.load(getPropertyFileInputStream)
    prop
  }
}
  • People类和Student类
case class People(name: String, age: String, sex: String) extends Serializable


case class Student(name: String, age: String, sex: String, idNum: String) extends Serializable

二、配置文件

  • people.properties
master=local
appName=Test7
className=com.test.People
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex from tmp
  • student.properties
master=local
appName=Test7
className=com.test.Student
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex, _4 as idNum from tmp

三、执行

提交Structured Streaming程序,需要加上参数,例如:-DPropPath=/Users/zhangzhiqiang/Documents/test_project/comtest/src/main/resources/people.properties

本地调试,可以Idea的VM Option选项添加。

image

3.1 传入people.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male"},结果显示:

image

3.2 传入student.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male","idNum":"1001"},结果显示:

image


github代码:structuredstreamngdemo项目

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐