FlinkSQL连接Kafka
创建kafka topickafka-topics --zookeeper test:2181 --topic test --partitions 1 --replication-factor 1 --create生成测试数据kafka-console-producer --broker-list test:9092 --topic test开启FlinkSQL./sql-client.sh em
·
创建kafka topic
kafka-topics --zookeeper test:2181 --topic test --partitions 1 --replication-factor 1 --create
生成测试数据
kafka-console-producer --broker-list test:9092 --topic test
开启FlinkSQL
./sql-client.sh embedded --jar /home/<username>/flink-sql-connector-kafka_2.11-1.11.0.jar
创建连接kafka的一张表
CREATE TABLE kafkaTable (
a String
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'test-4:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
运行SQL
select * from kafkaTable;
更多推荐
已为社区贡献1条内容
所有评论(0)