Flink读取嵌套Json
package kafka;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;import org.apache.flink.streaming.api.environment.StreamExe...
·
package kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
/**
* 样例数据:
{"userId":"1","day":"2020-01-05","data":[{"package":"com.zyd","activetime":"2311"}]}
*/
public class KafkaConnectDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);
sTableEnv.connect(new Kafka()
.version("0.10")
.topic("test")
.startFromLatest()
.property("group.id", "g1")
.property("bootstrap.servers", "note01:9092,note02:9092,note03:9092")
).withFormat(
new Json()
.failOnMissingField(true)
.deriveSchema()
).withSchema(
new Schema()
.field("userId", Types.LONG()) //一层嵌套json
.field("day", Types.STRING())
.field("data", ObjectArrayTypeInfo.getInfoFor(
Row[].class,
Types.ROW(
new String[]{"package", "activetime"},
new TypeInformation[] {Types.STRING(), Types.LONG()}
)
))
).inAppendMode().registerTableSource("userlog");
Table table = sTableEnv.sqlQuery("select * from userlog");
sTableEnv.toRetractStream(table,Row.class).print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果
11> (true,1,2020-01-05,[com.zyd,2311])
更多推荐
已为社区贡献8条内容
所有评论(0)