flink1.11.2中Table & SQL连接kafka
#flink连接kafka并读取数据前提条件flink版本1.11.2直接上代码import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.stream
·
#flink连接kafka并读取数据
前提条件
flink版本1.11.2
直接上代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class TestMain {
protected static StreamExecutionEnvironment env;
protected static StreamTableEnvironment tEnv;
protected static int ff = 0;
public static void main(String[] args) throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance()
// Watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build()
);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000);
env.setParallelism(1);
String createTable = String.format(
//"CREATE TABLE UserScores (type STRING, orderNo STRING,productName STRING,money FLOAT,name STRING,zoneCode STRING,zoneName STRING)\n" +
"CREATE TABLE UserScores (name STRING)\n" +
"WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'nima',\n" +
" 'properties.bootstrap.servers' = '192.168.120.130:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'format' = 'json',\n" +
//" 'scan.startup.timestamp-millis' = '1605147648000',\n" +
// " 'csv.field-delimiter' = '\t',\n" +
" 'scan.startup.mode' = 'group-offsets'\n" +
")");
TableResult tableResult = tEnv.executeSql(createTable);
Table table = tEnv.sqlQuery("SELECT * FROM UserScores");
DataStream<Row> infoDataStream1 = tEnv.toAppendStream(table, Row.class);
infoDataStream1.print();
}
}
读取hive的代码
具体说明参考官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
更多推荐
已为社区贡献2条内容
所有评论(0)