flink table kafka connect 使用
public class TableSchemaTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();...
·
public class TableSchemaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(
new Kafka()
.version("0.10")
.topic("logdata")
.startFromEarliest()
.property("bootstrap.servers", "127.0.0.1:9092")
.property("group.id", "server")
).withFormat(
new Json().failOnMissingField(false).deriveSchema()
).withSchema(
new Schema()
/**
* dataset
* docker.container docker.network docker.memory docker.diskio docker.cpu
* system.cpu system.network system.diskio system.process
**/
.field("host",Types.ROW(new String[]{"name"},new TypeInformation[]{Types.STRING()}))
.field("metricset", Types.ROW(new String[]{"name"},new TypeInformation[]{Types.STRING()}) )
.field("event",Types.ROW(new String[]{"dataset"},new TypeInformation[]{Types.STRING()}))
)
.inAppendMode()
.registerTableSource("server");
Table table = tableEnv.sqlQuery("select * from server ");
tableEnv.toAppendStream(table, Row.class).print();
env.execute();
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)