前言:

1、数据流

Mysql -> Debezium -> Kafka -> Structured Streaming -> ElasticSearch

2、Mysql -> Debezium -> Kafka 参考

Debezium:mysql connector使用_TracyGao01的博客-CSDN博客_could not initialize class io.debezium.connector.m

3、ES/Spark参考

Apache Spark support | Elasticsearch for Apache Hadoop [8.5] | Elastic (ES/Spark)

Structured Streaming Programming Guide - Spark 3.3.1 Documentation (Spark Structured Streaming)


import org.apache.spark.sql.SparkSession

import org.elasticsearch.hadoop.cfg.ConfigurationOptions

import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

object StructedSteamingEsSink {

  def main(args: Array[String]): Unit = {

    val ss = SparkSession.builder()

      .config(ConfigurationOptions.ES_NODES, "xx.xx.xx.xx")

      .config(ConfigurationOptions.ES_PORT, "9200")

      .appName("StructedStreamingEsSink")

      .master("local")

      .getOrCreate()

    //SparkSql:Debezium输出json解析

    val schema = new StructType()

      .add("schema",new StructType()

          .add("type",StringType)

          .add("fields",ArrayType(new StructType()

              .add("type",StringType)

              .add("fields",ArrayType(new StructType()

                  .add("type",StringType)

                  .add("optional",BooleanType)

                  .add("field",StringType)))

              .add("optional",BooleanType)

              .add("name",StringType)

              .add("field",StringType))

          )

        .add("optional",StringType)

        .add("name",StringType))

      .add("payload",new StructType()

          .add("before",new StructType()

              .add("ID",IntegerType)

              .add("NAME",StringType))

          .add("after",new StructType()

            .add("ID",IntegerType)

            .add("NAME",StringType))

          .add("source",new StructType()

              .add("version",StringType)

              .add("name",StringType)

              .add("server_id",StringType)

              .add("server_id",LongType)

              .add("ts_sec",TimestampType)

              .add("gtid",LongType)

              .add("file",StringType)

              .add("pos",IntegerType)

              .add("row",IntegerType)

              .add("snapshot",BooleanType)

              .add("thread",IntegerType)

              .add("db",StringType)

              .add("table",StringType)

              .add("query",StringType))

          .add("op",StringType)

          .add("ts_ms",TimestampType))

    val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"

    val jsonOptions =  Map("timestampFormat" -> nestTimestampFormat)

    //读kafka接收到来自于Debezium的json数据

    val df = ss.readStream.format("kafka").

      option("subscribe","debezium.debezium.test")

      .option("kafka.bootstrap.servers","BigData-Dev-5:9092,BigData-Dev-4:9092,BigData-Dev-3:9092,BigData-Dev-2:9092")

      .option("startingOffsets","earliest").load().select(from_json(col("value").cast("string"), schema, jsonOptions).alias("test_table"))

    import ss.implicits._

    val name = df.select($"test_table.payload.after.id",$"test_table.payload.after.name")

    //写入ES

    val esOptions = Map(

      "es.write.operation"      -> "upsert"

      ,"es.mapping.id"           -> "id")

    //指定参数根据指定ID更新写入ES

    name.writeStream.options(esOptions)

      .format("org.elasticsearch.spark.sql")

      .option("checkpointLocation","hdfs://zt01/tmp/kafka")

      .start("test/m_retail").awaitTermination()

    //直接写入ES

//    name.writeStream.outputMode("append")

//      .format("org.elasticsearch.spark.sql")

//      .option("checkpointLocation","hdfs://zt01/tmp/kafka")

//      .start("test/m_retail").awaitTermination()

    //控制台测试

//    name.writeStream.outputMode("append")

//      .format("console").option("checkpointLocation","hdfs://zt01/tmp/kafka").start().awaitTermination()

  }

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐