Flink SQL Clien读取Kafka数据写入Hive
版本说明:Flink 1.11.1Kafka 2.4.0Hive 2.3.6Hadoop 2.7.3详细步骤:准备相关jar包hive-exec-2.3.6.jarhive-metastore-2.3.6.libfb303-0.9.3.jarjarkafka-clients-2.4.0.jarflink-sql-connector-hive-2.3.6_2.11-1.11.1.jarflink-s
·
版本说明:
- Flink 1.11.1
- Kafka 2.4.0
- Hive 2.3.6
- Hadoop 2.7.3
详细步骤:
- 准备相关jar包
hive-exec-2.3.6.jar
hive-metastore-2.3.6.
libfb303-0.9.3.jarjar
kafka-clients-2.4.0.jar
flink-sql-connector-hive-2.3.6_2.11-1.11.1.jar
flink-sql-connector-kafka_2.11-1.11.1.jar
flink-shaded-hadoop2-uber-2.7.5-1.8.3.jar
- 修改配置文件
修改$FLINK_HOME/conf/sql-client-defaults.yaml,主要修改两个地方:
catalogs:
- name: myhive
type: hive
hive-conf-dir: /vm_data/apps/apache-hive-2.3.6-bin/conf
default-database: default
execution:
current-catalog: myhive
修改$FLINK_HOME/bin/start-cluster.sh,加入Hadoop Classpath:
export HADOOP_CLASSTHPATH=`hadoop classthpath`
修改$HIVE_HOME/conf/hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop000:9083</value>
</property>
- 启动Flink SQL Client
需要先启动Flink Standalone集群
$FLINK_HOME/bin/start-cluster.sh
$FLINK_HOME/bin/sql-client.sh embedded
- 在Flink SQL Client中创建Hive表,指定数据源为Kafka
在配置文件中配置了默认为default库
CREATE TABLE student(
id INT,
name STRING,
password STRING,
age INT,
ts BIGINT,
eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal', -- 指定Kafka连接器版本,不能为2.4.0,必须为universal,否则会报错
'connector.topic' = 'student', -- 指定消费的topic
'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置
'connector.properties.zookeeper.connect' = 'hadoop000:2181',
'connector.properties.bootstrap.servers' = 'hadooop000:9092',
'connector.properties.group.id' = 'student_1',
'format.type' = 'json',
'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON
'update-mode' = 'append'
);
- 启动Kafka,发送数据
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop000:9092 --topic student
{"id":12, "name":"kevin", "password":"wong", "age":22, "ts":1603769073}
- 通过Flink SQL Client查询表中的数据
select * from student
更多推荐
已为社区贡献2条内容
所有评论(0)