Flink: Blink分支 1.5.1  

https://github.com/apache/flink/tree/blink


Maven Dependency:

<dependency>
  <groupId>com.alibaba.blink</groupId>
  <artifactId>flink-scala_2.11</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>com.alibaba.blink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>com.alibaba.blink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>com.alibaba.blink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>com.alibaba.blink</groupId>
  <artifactId>flink-json</artifactId>
  <version>1.5.1</version>
</dependency>
<dependency>
  <groupId>com.alibaba.blink</groupId>
  <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
  <version>1.5.1</version>
</dependency>

Kafka数据类型为Json,格式如下:

{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "struct",
            "fields": [{
                "type": "int32",
                "optional": false,
                "field": "id"
            }, {
                "type": "int32",
                "optional": true,
                "field": "age"
            }],
            "optional": true,
            "name": "postgres.public.test.Value",
            "field": "before"
        }, {
            "type": "struct",
            "fields": [{
                "type": "int32",
                "optional": false,
                "field": "id"
            }, {
                "type": "int32",
                "optional": true,
                "field": "age"
            }],
            "optional": true,
            "name": "postgres.public.test.Value",
            "field": "after"
        }, {
            "type": "struct",
            "fields": [{
                "type": "string",
                "optional": true,
                "field": "version"
            }, {
                "type": "string",
                "optional": false,
                "field": "name"
            }, {
                "type": "string",
                "optional": false,
                "field": "db"
            }, {
                "type": "int64",
                "optional": true,
                "field": "ts_usec"
            }, {
                "type": "int64",
                "optional": true,
                "field": "txId"
            }, {
                "type": "int64",
                "optional": true,
                "field": "lsn"
            }, {
                "type": "string",
                "optional": true,
                "field": "schema"
            }, {
                "type": "string",
                "optional": true,
                "field": "table"
            }, {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            }, {
                "type": "boolean",
                "optional": true,
                "field": "last_snapshot_record"
            }],
            "optional": false,
            "name": "io.debezium.connector.postgresql.Source",
            "field": "source"
        }, {
            "type": "string",
            "optional": false,
            "field": "op"
        }, {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
        }],
        "optional": false,
        "name": "postgres.public.test.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "id": 452609,
            "age": 3
        },
        "source": {
            "version": "0.8.3.Final",
            "name": "postgres",
            "db": "postgres",
            "ts_usec": 1558601374145332000,
            "txId": 21371566,
            "lsn": 418263044043,
            "schema": "public",
            "table": "test",
            "snapshot": false,
            "last_snapshot_record": null
        },
        "op": "c",
        "ts_ms": 1558601374138
    }
}

示例代码:

import org.apache.flink.api.common.typeinfo.TypeInformation

import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api.{TableEnvironment, Types}

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}

import org.apache.flink.types.Row

import org.apache.flink.streaming.api.scala._

 

object KafkaConsumerDemo {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tblEnv = TableEnvironment.getTableEnvironment(env)

    tblEnv.connect(new Kafka()

      .version("0.10")

      .topic("postgres.public.test")

      .property("zookeeper.connect", "BigData-Dev-1:2181")

      .property("bootstrap.servers","BigData-Dev-1:9092")

      .startFromLatest()

    ).withFormat(new Json().deriveSchema())

      .withSchema(new Schema().field("schema",Types.ROW(

        Array("type","fields","optional","name"),

        Array[TypeInformation[_]](Types.STRING,ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,

          Types.ROW(Array("type","fields","optional","name","field"),Array[TypeInformation[_]](

            Types.STRING,

            ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,Types.ROW(Array("type","optional","field"),Array[TypeInformation[_]](Types.STRING,Types.STRING,Types.STRING)))

            ,Types.STRING,Types.STRING,Types.STRING)

          )),Types.STRING,Types.STRING)

      )).field("payload", Types.ROW(

        Array[String]("before", "after", "source", "op", "ts_ms"),

        Array[TypeInformation[_]](

          Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),

          Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),

          Types.ROW(Array[String]("version", "name", "db", "ts_usec", "txId", "lsn", "schema", "table", "snapshot", "last_snapshot_record"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)),

          Types.STRING, Types.STRING))))

      .inAppendMode()

      .registerTableSource("test")

    val tableResult = tblEnv.sqlQuery("select after.id,after.age from test where  after.id is not null")

    tblEnv.toAppendStream[Row](tableResult).print()

    env.execute()

  }

}

测试:

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐