Flink 操作 demo
package com.woople.streaming.scala.examples.kafkaimport java.util.Propertiesimport com.woople.flink.streaming.connectors.kafka.{CsvRowDeserializationSchema, KafkaCsvTableSource}import org.apache.fl
·
package com.woople.streaming.scala.examples.kafka import java.util.Properties import com.woople.flink.streaming.connectors.kafka.{CsvRowDeserializationSchema, KafkaCsvTableSource} import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row object FlinkKafkaDemo { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val typeInfo = Types.ROW_NAMED(Array("imsi","lac","cell"), Types.STRING, Types.STRING, Types.STRING) val properties = new Properties() properties.setProperty("bootstrap.servers", "10.1.236.66:6667") properties.setProperty("group.id", "test") //Register a TableSource val kafkaTableSource = new KafkaCsvTableSource( "foo", properties, new CsvRowDeserializationSchema(typeInfo), typeInfo) tableEnv.registerTableSource("KafkaCsvTable", kafkaTableSource) val kafkaCsvTable = tableEnv.scan("KafkaCsvTable") val filterResult = kafkaCsvTable.where('imsi like "460%").select("imsi,lac,cell") val dsRow: DataStream[Row] = tableEnv.toAppendStream(filterResult) { val types = Array[TypeInformation[_]]( Types.STRING, Types.STRING, Types.STRING, Types.BOOLEAN, Types.LONG) val names = Array("imsi","lac","cell","isSpecifiedLocation","timestamp") implicit val tpe: TypeInformation[Row] = new RowTypeInfo(types, names) val newDsRows = dsRow.map(row => { val ret = new Row(row.getArity() + 2) for(i <- 0 to row.getArity()-1) { ret.setField(i, row.getField(i)) } val isSpecifiedLocation = if(ret.getField(1).equals(ret.getField(2))) true else false ret.setField(row.getArity(), isSpecifiedLocation) ret.setField(row.getArity()+1, System.currentTimeMillis()) ret }) tableEnv.registerDataStream("newTable", newDsRows) val newKafkaCsvTable = tableEnv.scan("newTable") val newResult = newKafkaCsvTable.filter('isSpecifiedLocation === true).select("imsi,lac,cell,isSpecifiedLocation,timestamp") val sink = new Kafka09JsonTableSink("bar", properties, new FlinkFixedPartitioner[Row]) newResult.writeToSink(sink) env.execute("Flink kafka demo") } } }
更多推荐
已为社区贡献1条内容
所有评论(0)