flink 1.12 SQL Demo
Flink 版本 1.12.3source是kafka 维表是MySQL source left join 维表public class FlinkTableDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironmen
·
Flink 版本 1.12.3
source是kafka 维表是MySQL source left join 维表
public class FlinkTableDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String sourceTable = "CREATE TABLE kafka_table " +
"( " +
" int int ," +
" age int ," +
" proctime as PROCTIME() " +
")WITH(" +
" 'connector' = 'kafka' " +
" ,'topic' = 'test' " +
" ,'properties.bootstrap.servers' = 'XXXX' " +
" ,'properties.group.id' = 'test_group_1' " +
" ,'scan.startup.mode' = 'group-offsets' " +
" ,'format' = 'canal-json' " +
")";
String sideTable = "CREATE TABLE mysql_side " +
"( " +
" int int ," +
" name varchar " +
") WITH ( " +
" 'connector.type' = 'jdbc' " +
" ,'connector.url' = 'XXXX' " +
" ,'connector.table' = 'mysql_side' " +
" ,'connector.driver' = 'com.mysql.jdbc.Driver' " +
" ,'connector.username' = 'XXX' " +
" ,'connector.password' = 'XXX' " +
")";
tableEnv.executeSql(sourceTable);
tableEnv.executeSql(sideTable);
TableResult tableResult = tableEnv.executeSql(
"select " +
" k.id" +
",k.age" +
",s.name " +
"from kafka_table k " +
"left join mysql_side FOR SYSTEM_TIME AS OF k.proctime s " +
"on k.id=s.id");
tableResult.print();
env.execute("table test");
}
}
更多推荐
已为社区贡献7条内容
所有评论(0)