public class TableSchemaTest {


    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.connect(
                new Kafka()
                        .version("0.10")
                        .topic("logdata")
                        .startFromEarliest()
                        .property("bootstrap.servers", "127.0.0.1:9092")
                        .property("group.id", "server")

        ).withFormat(
                new Json().failOnMissingField(false).deriveSchema()

        ).withSchema(
                new Schema()
                        /**
                         * dataset
                         * docker.container  docker.network  docker.memory  docker.diskio  docker.cpu
                         * system.cpu  system.network  system.diskio  system.process
                         **/
                        .field("host",Types.ROW(new String[]{"name"},new TypeInformation[]{Types.STRING()}))
                        .field("metricset", Types.ROW(new String[]{"name"},new TypeInformation[]{Types.STRING()})   )
                        .field("event",Types.ROW(new String[]{"dataset"},new TypeInformation[]{Types.STRING()}))



        )
                .inAppendMode()
                .registerTableSource("server");

        Table table = tableEnv.sqlQuery("select  *  from server   ");
        tableEnv.toAppendStream(table, Row.class).print();
        env.execute();

    }



}

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐