Structured Streaming与Kafka的整合,实现不同json结构解耦
Structured Streaming与Kafka的整合,实现不同json结构解耦问题:Structured Streaming从kafka的不同topic读取数据,每个topic的value存取的数据格式是不同的。那么怎么使用一套模版代码,分别对多个topic进行读取数据。做到解耦呢?思考:Structured Streaming读取kafka的操作是一致的,只是对kafka的value值..
Structured Streaming与Kafka的整合,实现不同json结构解耦
问题:Structured Streaming从kafka的不同topic读取数据,每个topic的value存取的数据格式是不同的。那么怎么使用一套模版代码,分别对多个topic进行读取数据。做到解耦呢?
思考:Structured Streaming读取kafka的操作是一致的,只是对kafka的value值的解析操作和一些参数配置,处理数据的sql是不一样的。可以把解析操作抽象出来处理。通过定义Bean对象,将json解析成对应的Bean,最后通过传入配置文件的方式,将对应的配置信息及sql传入,然后对数据来进行处理,得到需要的数据。
一、具体代码如下:
- CommonStructuedKafka类
package com.test
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object CommonStructuedKafka {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.kafka").setLevel(Level.WARN)
// 读取配置文件信息
val masterUrl = Props.get("master", "local")
val appName = Props.get("appName", "Test7")
val className = Props.get("className", "")
val kafkaBootstrapServers = Props.get("kafka.bootstrap.servers", "localhost:9092")
val subscribe = Props.get("subscribe", "test")
val tmpTable = Props.get("tmpTable", "tmp")
val sparksql = Props.get("sparksql", "select * from tmp")
val spark = SparkSession.builder()
.master(masterUrl)
.appName(appName)
.getOrCreate()
// 读取kafka数据
val lines = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", subscribe)
.load()
//隐式转换
import spark.implicits._
val values = lines.selectExpr("cast(value as string)").as[String]
val res = values.map { value =>
// 将json数据解析成list集合
val list = Tools.parseJson(value, className)
// 将List转成元组
Tools.list2Tuple7(list)
}
res.createOrReplaceTempView(tmpTable)
val result = spark.sql(sparksql)
val query = result.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination()
}
}
- Tools:解析json的工具类
package com.test
import com.google.gson.Gson
import scala.collection.mutable
object Tools {
def main(args: Array[String]): Unit = {
val tools = new Tools()
val res = tools.parse("{'name':'caocao','age':'32','sex':'male'}", "com.test.People")
println(res)
}
def parseJson(json: String, className: String): List[String] = {
val tools = new Tools()
tools.parse(json, className)
}
// 将List转成Tuple7元组类,这里仅仅是定义7个字段,可以定义更多字段。(ps:这种处理方式很不雅,一时也没想到好办法)
def list2Tuple7(list: List[String]): (String, String, String, String, String, String, String) = {
val t = list match {
case List(a) => (a, "", "", "", "", "", "")
case List(a, b) => (a, b, "", "", "", "", "")
case List(a, b, c) => (a, b, c, "", "", "", "")
case List(a, b, c, d) => (a, b, c, d, "", "", "")
case List(a, b, c, d, e) => (a, b, c, d, e, "", "")
case List(a, b, c, d, e, f) => (a, b, c, d, e, f, "")
case List(a, b, c, d, e, f, g) => (a, b, c, d, e, f, g)
case _ => ("", "", "", "", "", "", "")
}
t
}
}
class Tools {
// 通过传进来的Bean的全类名,进行反射,解析json,返回一个List()
def parse(json: String, className: String): List[String] = {
val list = mutable.ListBuffer[String]()
val gson = new Gson()
val clazz = Class.forName(className)
val obj = gson.fromJson(json, clazz)
val aClass = obj.getClass
val fields = aClass.getDeclaredFields
fields.foreach { f =>
val fName = f.getName
val m = aClass.getDeclaredMethod(fName)
val value = m.invoke(obj).toString
list.append(value)
}
list.toList
}
}
- Props:读取配置文件的工具类
package com.test
import java.io.{FileInputStream, InputStream}
import java.nio.file.{Files, Paths}
import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
object Props {
private val prop = new Properties()
prop.load(getPropertyFileInputStream)
/**
* 在spark-submit中加入--driver-java-options -DPropPath=/home/spark/prop.properties的参数后,
* 使用System.getProperty("PropPath")就能获取路径:/home/spark/prop.properties如果spark-submit中指定了
* prop.properties文件的路径,那么使用prop.properties中的属性,否则使用该类中定义的属性
*/
private def getPropertyFileInputStream: InputStream = {
var is: InputStream = null
val filePath = System.getProperty("PropPath")
if (filePath != null && filePath.length > 0) {
if (Files.exists(Paths.get(filePath))) {
is = new FileInputStream(filePath)
} else {
println(s"在本地未找到config文件$filePath,尝试在HDFS上获取文件")
val fs = FileSystem.get(new Configuration())
if (fs.exists(new Path(filePath))) {
val fis = fs.open(new Path(filePath))
is = fis.getWrappedStream
} else {
println(s"在HDFS上找不到config文件$filePath,加载失败...")
}
}
} else {
println(s"未设置配置文件PropPath")
}
is
}
def get(propertyName: String, defaultValue: String): String = {
prop.getProperty(propertyName, defaultValue)
}
def get(): Properties = {
println("prop:" + this.prop)
this.prop
}
def reload(): Properties = {
prop.load(getPropertyFileInputStream)
prop
}
}
- People类和Student类
case class People(name: String, age: String, sex: String) extends Serializable
case class Student(name: String, age: String, sex: String, idNum: String) extends Serializable
二、配置文件
- people.properties
master=local
appName=Test7
className=com.test.People
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex from tmp
- student.properties
master=local
appName=Test7
className=com.test.Student
kafka.bootstrap.servers=localhost:9092
subscribe=test
tmpTable=tmp
sparksql=select _1 as name, _2 as age, _3 as sex, _4 as idNum from tmp
三、执行
提交Structured Streaming程序,需要加上参数,例如:-DPropPath=/Users/zhangzhiqiang/Documents/test_project/comtest/src/main/resources/people.properties
本地调试,可以Idea的VM Option选项添加。
3.1 传入people.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male"}
,结果显示:
3.2 传入student.properties文件,执行程序,在kafka生产端命令行输入{"name":"caocao","age":"32","sex":"male","idNum":"1001"}
,结果显示:
github代码:structuredstreamngdemo项目
更多推荐
所有评论(0)