Debezium:数据实时采集从Kafka到phoenix
目的:构建基于hbase的实时数仓解决的问题:RDBMS到Hbase的数据实时采集,并写入Phoenix方法:Postgresql -----> Debezium -----> Kafka ------> Sparkstreaming ------> Phoenix本文:本文主要是从
目的:构建基于hbase的实时数仓
解决的问题:RDBMS到Hbase的数据实时采集,并写入Phoenix
方法:Postgresql -----> Debezium -----> Kafka ------> Sparkstreaming ------> Phoenix
本文:本文主要是从Kafka到Phoenix,Postgresql到Kafka介绍请参考:https://blog.csdn.net/u012551524/article/details/82798066
1、Maven依赖
//spark-streaming-kafka
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${spark.artifact}</artifactId>
<version>${spark.version}</version>
<scope>${dependency.scope}</scope>
</dependency>
//json解析
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.13</version>
</dependency>
//phoenix
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.14.0-cdh5.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.0-cdh5.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tephra</groupId>
<artifactId>tephra-hbase-compat-1.2-cdh</artifactId>
<version>0.13.0-incubating</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
备注:如果使用spark2.0以后版本写入phoenix,请编译phoenix版本,修改pom文件spark版本为相应版本,重新编译打包,引入编译后的jar包:
2、kafka消费模式
- 单纯的PG端数据变更写入phoenix,kafka数据的消费顺序必须严格一致,所以是单分区消费,保证消费数据顺序与PG修改顺序一致
- 消费过程中会记录offset,写到phoenix的一张offset配置表中,每次重启程序会去phoenix配置表读取对应topic|group的offset
3、代码示例
package spark.learning
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializerFeature
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.phoenix.spark._
import org.apache.spark.streaming.dstream.InputDStream
object KafkaOffset {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("KafkaSparkStreaming")
val sc = new StreamingContext(conf,Seconds(20))
val groupId = "Kafka_Direct"
val topics = Array("fullfillment.public.test1")
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "BigData-Dev-5:9092,BigData-Dev-4:9092,BigData-Dev-3:9092,BigData-Dev-2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
//自定义获取偏移量
// val formdbOffset: Map[TopicPartition, Long] = Map(new TopicPartition("fullfillment.public.test1",0) -> 102)
val sqlContext= new SQLContext(sc.sparkContext)
import sqlContext.implicits._
//从phoenix记录offset的表获取消费信息:topic/partition/offset
val df = sqlContext.load("org.apache.phoenix.spark",Map("table" -> "OSET","zkUrl" -> "BigData-Dev-1:2181"))
val setList = df.filter($"ID" === topics.apply(0) + "-" + groupId + "-" + 0).select(df("ID"),df("OSET")).takeAsList(1)
//判断offset表中是否含有对应topic|group的offset,有的话从记录点开始消费,没有的话从头消费
if(setList.size() == 1) {
val topicPartitionGroup = setList.get(0).apply(0)
val offSet = setList.get(0).apply(1).asInstanceOf[Long]
//new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
val formdbOffset: Map[TopicPartition, Long] = Map(new TopicPartition((topicPartitionGroup.toString.split("-")) (0), (topicPartitionGroup.toString.split("-")) (2).toInt) -> offSet)
//获取自定义偏移量的DStream
val lines = KafkaUtils.createDirectStream(
sc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](formdbOffset.keys, kafkaParams, formdbOffset)
)
writeData(lines,groupId,sqlContext)
}else{
val lines = KafkaUtils.createDirectStream[String, String](
sc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics,kafkaParams)
)
writeData(lines,groupId,sqlContext)
}
sc.start()
sc.awaitTermination()
}
//写数据、写offset
def writeData (lines:InputDStream[ConsumerRecord[String,String]], groupId:String, sqlContext:SQLContext): Unit = {
//Json解析拿到的数据
val vl = lines.map(x=>{
val json = JSON.parseObject(x.value())
if(json.getString("schema") == null){
"null"
}else{
// val before = json.getJSONObject("payload").get("before")
val after = json.getJSONObject("payload").get("after")
// (JSON.toJSONString(before,SerializerFeature.WriteMapNullValue),JSON.toJSONString(after,SerializerFeature.WriteMapNullValue))
// println(JSON.toJSONString(after,SerializerFeature.WriteMapNullValue))
println(x)
JSON.toJSONString(after,SerializerFeature.WriteMapNullValue)
}
})
val rowRdd=vl.filter(x=>x!="null").map(x=>JSON.parseObject(x)).map(x=>Row(x.getString("id"),x.getString("age")))
object testSchema {
val column1 = StructField("ID",StringType)
val column2 = StructField("AGE",StringType)
val struct = StructType(Array(column1,column2))
}
//写入phoenix目标表
rowRdd.foreachRDD(rdd => {
val testDF = sqlContext.createDataFrame(rdd,testSchema.struct)
testDF.write.
format("org.apache.phoenix.spark").
mode("overwrite").
option("table","test1").
option("zkUrl","BigData-Dev-1:2181")
.save()
// testDF.saveToPhoenix(Map("table" -> "test1", "zkUrl" -> "BigData-Dev-5:2181,BigData-Dev-4:2181,BigData-Dev-3:2181,BigData-Dev-2:2181,BigData-Dev-1:2181"))
})
//将消费信息:topic、groupID、partition、untilOffset写入phoenix记录offset的表
lines.foreachRDD(rdd => {
//OffsetRange:topic name,partition id,fromOffset,untilOffset
val offsetRange:Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for(nt <- offsetRange){
println(nt.untilOffset.toString + nt.fromOffset.toString)
val dataSet = List((nt.topic.toString + "-" + groupId + "-" + nt.partition.toString , nt.untilOffset.toLong))
rdd.context.parallelize(dataSet).saveToPhoenix("OSET",Seq("ID","OSET"),zkUrl=Some("BigData-Dev-1:2181"))
}
})
}
}
4、测试
- kafka服务打开
- Debezium连接器打开
- 第一次消费offset配置表为空,从头消费
- 启动应用程序,开始消费数据
- 查看phoenix写入的目标表和offset表 offset表更新
- 接下来Postgresql端更新一条数据
- 服务端消费,数据表变化,offset表变化
- 接下来,停掉应用程序,修改offset表的值,指定消费本次消费从指定offset为100位置开始消费
- 简单测试通过
更多推荐
所有评论(0)