(五)flink的Table API 与 SQL-更新模式与kafka连接器案例
文章目录更新模式应用实例更新模式对于stream类型的Table数据,需要标记是由于INSERT,UPDATE,DELETE中哪种操作更新的数据,在Table API中通过Update Modes指定数据更新类型,通过指定不同的Update Modes确定哪种更新操作的数据与外部系统进行交互.connect(...).inAppendMode()//交互INSERT操作更新数据.in...
·
更新模式
对于stream类型的Table数据,需要标记是由于INSERT,UPDATE,DELETE中哪种操作更新的数据,在Table API中通过Update Modes指定数据更新类型,通过指定不同的Update Modes确定哪种更新操作的数据与外部系统进行交互
.connect(...)
.inAppendMode() //交互INSERT操作更新数据
.inUpsertMode() //INSERT,UPDATE,DELETE操作更新数据
.inRetractMode() //交互INSERT和DELETE操作更新数据
应用实例
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Json, Kafka, Rowtime, Schema}
object ConnectorDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tabEnv = StreamTableEnvironment.create(env)
tabEnv.connect(
new Kafka()
.version("0.10")
.topic("test")
.property("zookeeper.connect","note01:2181,note02:2181,note03:2181")
.property("bootstrap.servers","note01:9092,note02:9092,note03:9092")
)
//指定Table Format信息
.withFormat(
new Json()
.failOnMissingField(true)
.jsonSchema(
"""
|{
|type:'object',
|properties:{
|id:{
|type:'number'
|},
|name:{
|type:'string'
|},
|timestamp:{
|type:'string'
|format:'date-time'
|}
|}
|}
|""".stripMargin)
).withSchema(
new Schema()
.field("id",Types.INT)
.field("name",Types.STRING)
.field("rowtime",Types.SQL_TIMESTAMP)
.rowtime(
new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000) //1 minutes
)
)
//指定数据更新模式
.inAppendMode()
.registerTableSource("kafkaTable")
}
}
更多推荐
已为社区贡献8条内容
所有评论(0)