Debezium:Spark Structured Streaming将Kafka的数据写入ElasticSearch
前言:1、数据流Mysql -> Debezium -> Kafka -> Structured Streaming ->ElasticSearch2、Mysql -> Debezium -> Kafka 参考https://blog.csdn.net/u012551524/article/details/8425
前言:
1、数据流
Mysql -> Debezium -> Kafka -> Structured Streaming -> ElasticSearch
2、Mysql -> Debezium -> Kafka 参考
Debezium:mysql connector使用_TracyGao01的博客-CSDN博客_could not initialize class io.debezium.connector.m
3、ES/Spark参考
Apache Spark support | Elasticsearch for Apache Hadoop [8.5] | Elastic (ES/Spark)
Structured Streaming Programming Guide - Spark 3.3.1 Documentation (Spark Structured Streaming)
import org.apache.spark.sql.SparkSession
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object StructedSteamingEsSink {
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.config(ConfigurationOptions.ES_NODES, "xx.xx.xx.xx")
.config(ConfigurationOptions.ES_PORT, "9200")
.appName("StructedStreamingEsSink")
.master("local")
.getOrCreate()
//SparkSql:Debezium输出json解析
val schema = new StructType()
.add("schema",new StructType()
.add("type",StringType)
.add("fields",ArrayType(new StructType()
.add("type",StringType)
.add("fields",ArrayType(new StructType()
.add("type",StringType)
.add("optional",BooleanType)
.add("field",StringType)))
.add("optional",BooleanType)
.add("name",StringType)
.add("field",StringType))
)
.add("optional",StringType)
.add("name",StringType))
.add("payload",new StructType()
.add("before",new StructType()
.add("ID",IntegerType)
.add("NAME",StringType))
.add("after",new StructType()
.add("ID",IntegerType)
.add("NAME",StringType))
.add("source",new StructType()
.add("version",StringType)
.add("name",StringType)
.add("server_id",StringType)
.add("server_id",LongType)
.add("ts_sec",TimestampType)
.add("gtid",LongType)
.add("file",StringType)
.add("pos",IntegerType)
.add("row",IntegerType)
.add("snapshot",BooleanType)
.add("thread",IntegerType)
.add("db",StringType)
.add("table",StringType)
.add("query",StringType))
.add("op",StringType)
.add("ts_ms",TimestampType))
val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
val jsonOptions = Map("timestampFormat" -> nestTimestampFormat)
//读kafka接收到来自于Debezium的json数据
val df = ss.readStream.format("kafka").
option("subscribe","debezium.debezium.test")
.option("kafka.bootstrap.servers","BigData-Dev-5:9092,BigData-Dev-4:9092,BigData-Dev-3:9092,BigData-Dev-2:9092")
.option("startingOffsets","earliest").load().select(from_json(col("value").cast("string"), schema, jsonOptions).alias("test_table"))
import ss.implicits._
val name = df.select($"test_table.payload.after.id",$"test_table.payload.after.name")
//写入ES
val esOptions = Map(
"es.write.operation" -> "upsert"
,"es.mapping.id" -> "id")
//指定参数根据指定ID更新写入ES
name.writeStream.options(esOptions)
.format("org.elasticsearch.spark.sql")
.option("checkpointLocation","hdfs://zt01/tmp/kafka")
.start("test/m_retail").awaitTermination()
//直接写入ES
// name.writeStream.outputMode("append")
// .format("org.elasticsearch.spark.sql")
// .option("checkpointLocation","hdfs://zt01/tmp/kafka")
// .start("test/m_retail").awaitTermination()
//控制台测试
// name.writeStream.outputMode("append")
// .format("console").option("checkpointLocation","hdfs://zt01/tmp/kafka").start().awaitTermination()
}
}
更多推荐
所有评论(0)