FLINK-SQL
关联MySQL中的维度表示例/*** @Description:* 从Kafka中读数据,然后关联MySQL的维表,最后将结果输出到MySQL中*/public class MySQLDimensionDemo {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvir
·
关联MySQL中的维度表示例
/**
* @Description:
* 从Kafka中读数据,然后关联MySQL的维表,最后将结果输出到MySQL中
*/
public class MySQLDimensionDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//使用KafkaConnector从Kafka中读取数据
tableEnvironment.executeSql("CREATE TABLE KafkaTable (\n" +
" `oid` STRING, \n" +
" `cid` STRING, \n" +
" `money` DOUBLE, \n" +
" `ts` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
" proctime as PROCTIME(), --通过计算列产生一个处理时间列\n" +
" eventTime as ts, -- 事件时间\n" +
" WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND -- 在eventTime上定义watermark\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'order-csv',\n" +
" 'properties.bootstrap.servers' = 'linux01:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv',\n" +
" 'csv.ignore-parse-errors' = 'true'\n" +
")");
//使用JDBCConnector缓存维度数据
tableEnvironment.executeSql("CREATE TABLE tb_mysql_category (\n" +
" id STRING,\n" +
" name STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'lookup.cache.max-rows' = '5000',\n" + //最大缓存5000条数据
" 'lookup.cache.ttl' = '10min',\n" + //维度数据最大的存活时间
" 'table-name' = 'tb_category'\n" +
")\n");
//使用JDBCConnector将关联后的数据写入MySQL
tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (\n" +
" oid STRING,\n" +
" cid STRING,\n" +
" cname STRING,\n" +
" money DOUBLE,\n" +
" PRIMARY KEY (oid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',\n" +
" 'username' = 'root',\n" +
" 'password' = 'jin19930213',\n" +
" 'table-name' = 'tb_orders03'\n" +
")\n");
tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
"SELECT k.oid,k.cid,t.name as cname,k.money FROM KafkaTable as k\n" +
"LEFT JOIN tb_mysql_category FOR SYSTEM_TIME AS OF k.proctime as t\n" +
"ON k.cid= t.id");
}
}
获取Kafka中元数据示例
/**
* @Description:
* 从Kafka中读取数据,获取kafka中的元数据信息,然后将字段拼接
*/
public class KafkaConnectorMetadataDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql("CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
" `topic` STRING METADATA VIRTUAL, \n" +
" `partition` BIGINT METADATA VIRTUAL,\n" +
" `offset` BIGINT METADATA VIRTUAL \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = 'linux01:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv',\n" +
" 'csv.ignore-parse-errors' = 'true'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE print_table (\n" +
" `id` STRING,\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")");
tableEnvironment.executeSql("INSERT INTO print_table " +
"SELECT concat_ws('-',topic,CAST(`partition` AS STRING),CAST(`offset` AS STRING),DATE_FORMAT(event_time, 'yyyy-MM-dd')),user_id,item_id,behavior " +
"FROM KafkaTable");
/*tableEnvironment.executeSql("INSERT INTO print_table " +
"SELECT concat_ws('-',topic,CAST(`partition` AS STRING),CAST(`offset` AS STRING),CAST(event_time AS STRING)),user_id,item_id,behavior " +
"FROM KafkaTable");*/
}
}
写入MySQL的两种方式 (追加,覆盖)
/**
* @Description:
* 从Kafka中读取数据 写入MySQL 追加写
*/
public class KafkaToMySQLDemo01 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//使用KafkaConnector从Kafka中读取数据
tableEnvironment.executeSql("CREATE TABLE tb_user_order (\n" +
" `oid` STRING, \n" +
" `cid` STRING, \n" +
" `money` DOUBLE \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'order-json',\n" +
" 'properties.bootstrap.servers' = 'linux01:9092,linux01:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true' \n" + //忽略格式不良好的json
")");
//使用JDBCConnector向MySQL中插入数据
tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (\n" +
" `oid` STRING, \n" +
" `cid` STRING, \n" +
" `money` DOUBLE \n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'table-name' = 'tb_orders'\n" +
")");
//将数据插入MySQL中
tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
"SELECT oid,cid,money from tb_user_order");
}
}
/**
* @Description:
* 从Kafka中读取数据 写入MySQL 覆盖写
*/
public class KafkaToMySQLDemo02 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
//使用KafkaConnector从Kafka中读取数据
tableEnvironment.executeSql("CREATE TABLE tb_user_order (\n" +
" `oid` STRING, \n" +
" `cid` STRING, \n" +
" `money` DOUBLE \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'order-json02',\n" +
" 'properties.bootstrap.servers' = 'linux01:9092,linux01:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true' \n" + //忽略格式不良好的json
")");
//使用JDBCConnector向MySQL中插入数据
tableEnvironment.executeSql("CREATE TABLE tb_mysql_orders (\n" +
" `oid` STRING, \n" +
" `cid` STRING, \n" +
" `money` DOUBLE, \n" +
" PRIMARY KEY (oid) NOT ENFORCED"+
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/flink',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'table-name' = 'tb_orders02'\n" +
")");
//将数据插入MySQL中
tableEnvironment.executeSql("INSERT INTO tb_mysql_orders " +
"SELECT oid,cid,money from tb_user_order");
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)